From d68ed957f785f4e6969a213e05e4a6bbfc7c391a Mon Sep 17 00:00:00 2001 From: Lars Wirzenius Date: Fri, 26 Mar 2021 09:00:31 +0200 Subject: feat! rewrite code This started out as a change to re-do the command line parsing. I ended up rewriting everything, and failed to do it in a way that could be rebased into a sensible series of small commits. --- subplot/jt.py | 14 +-- subplot/jt.yaml | 8 +- subplot/runcmd.py | 252 ------------------------------------------- subplot/runcmd.yaml | 83 -------------- subplot/vendored/files.py | 194 +++++++++++++++++++++++++++++++++ subplot/vendored/files.yaml | 83 ++++++++++++++ subplot/vendored/runcmd.py | 252 +++++++++++++++++++++++++++++++++++++++++++ subplot/vendored/runcmd.yaml | 83 ++++++++++++++ 8 files changed, 624 insertions(+), 345 deletions(-) delete mode 100644 subplot/runcmd.py delete mode 100644 subplot/runcmd.yaml create mode 100644 subplot/vendored/files.py create mode 100644 subplot/vendored/files.yaml create mode 100644 subplot/vendored/runcmd.py create mode 100644 subplot/vendored/runcmd.yaml (limited to 'subplot') diff --git a/subplot/jt.py b/subplot/jt.py index 380cde4..31bceb7 100644 --- a/subplot/jt.py +++ b/subplot/jt.py @@ -2,6 +2,14 @@ import logging import os +def install_jt(ctx): + runcmd_prepend_to_path = globals()["runcmd_prepend_to_path"] + srcdir = globals()["srcdir"] + + bindir = os.path.join(srcdir, "target", "debug") + runcmd_prepend_to_path(ctx, bindir) + + def create_script(ctx, filename=None): get_file = globals()["get_file"] text = get_file(filename) @@ -50,12 +58,6 @@ def _binary(name): return os.path.join(srcdir, "target", "debug", "jt2") -def is_directory(ctx, dirname=None): - assert_eq = globals()["assert_eq"] - logging.debug("checking if %r is a directory", dirname) - assert_eq(os.path.isdir(dirname), True) - - def output_contains(ctx, pattern=None): assert_eq = globals()["assert_eq"] logging.debug("checking if %r contains", ctx["stdout"], pattern) diff --git a/subplot/jt.yaml b/subplot/jt.yaml index 6d10a3a..9c0983c 100644 --- a/subplot/jt.yaml +++ b/subplot/jt.yaml @@ -1,3 +1,6 @@ +- given: "an installed jt" + function: install_jt + - given: "an executable script {filename}" function: create_script @@ -24,9 +27,6 @@ - when: I edit draft {draftno} in {dirname} to also contain "{text}" function: edit_draft -- then: directory {dirname} exists - function: is_directory - - then: output contains "(?P.*)" regex: true function: output_contains @@ -46,5 +46,5 @@ - then: there is one journal entry in {dirname}, at {variable} function: journal_has_one_entry -- then: file <{variable}> contains "{pattern:text}" +- then: journal entry <{variable}> contains "{pattern:text}" function: file_contains diff --git a/subplot/runcmd.py b/subplot/runcmd.py deleted file mode 100644 index a2564c6..0000000 --- a/subplot/runcmd.py +++ /dev/null @@ -1,252 +0,0 @@ -import logging -import os -import re -import shlex -import subprocess - - -# -# Helper functions. -# - -# Get exit code or other stored data about the latest command run by -# runcmd_run. - - -def _runcmd_get(ctx, name): - ns = ctx.declare("_runcmd") - return ns[name] - - -def runcmd_get_exit_code(ctx): - return _runcmd_get(ctx, "exit") - - -def runcmd_get_stdout(ctx): - return _runcmd_get(ctx, "stdout") - - -def runcmd_get_stdout_raw(ctx): - return _runcmd_get(ctx, "stdout.raw") - - -def runcmd_get_stderr(ctx): - return _runcmd_get(ctx, "stderr") - - -def runcmd_get_stderr_raw(ctx): - return _runcmd_get(ctx, "stderr.raw") - - -def runcmd_get_argv(ctx): - return _runcmd_get(ctx, "argv") - - -# Run a command, given an argv and other arguments for subprocess.Popen. -# -# This is meant to be a helper function, not bound directly to a step. The -# stdout, stderr, and exit code are stored in the "_runcmd" namespace in the -# ctx context. -def runcmd_run(ctx, argv, **kwargs): - ns = ctx.declare("_runcmd") - - # The Subplot Python template empties os.environ at startup, modulo a small - # number of variables with carefully chosen values. Here, we don't need to - # care about what those variables are, but we do need to not overwrite - # them, so we just add anything in the env keyword argument, if any, to - # os.environ. - env = dict(os.environ) - for key, arg in kwargs.pop("env", {}).items(): - env[key] = arg - - pp = ns.get("path-prefix") - if pp: - env["PATH"] = pp + ":" + env["PATH"] - - logging.debug(f"runcmd_run") - logging.debug(f" argv: {argv}") - logging.debug(f" env: {env}") - p = subprocess.Popen( - argv, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env, **kwargs - ) - stdout, stderr = p.communicate("") - ns["argv"] = argv - ns["stdout.raw"] = stdout - ns["stderr.raw"] = stderr - ns["stdout"] = stdout.decode("utf-8") - ns["stderr"] = stderr.decode("utf-8") - ns["exit"] = p.returncode - logging.debug(f" ctx: {ctx}") - logging.debug(f" ns: {ns}") - - -# Step: prepend srcdir to PATH whenever runcmd runs a command. -def runcmd_helper_srcdir_path(ctx): - srcdir = globals()["srcdir"] - runcmd_prepend_to_path(ctx, srcdir) - - -# Step: This creates a helper script. -def runcmd_helper_script(ctx, filename=None): - get_file = globals()["get_file"] - with open(filename, "wb") as f: - f.write(get_file(filename)) - - -# -# Step functions for running commands. -# - - -def runcmd_prepend_to_path(ctx, dirname=None): - ns = ctx.declare("_runcmd") - pp = ns.get("path-prefix", "") - if pp: - pp = f"{pp}:{dirname}" - else: - pp = dirname - ns["path-prefix"] = pp - - -def runcmd_step(ctx, argv0=None, args=None): - runcmd_try_to_run(ctx, argv0=argv0, args=args) - runcmd_exit_code_is_zero(ctx) - - -def runcmd_try_to_run(ctx, argv0=None, args=None): - argv = [shlex.quote(argv0)] + shlex.split(args) - runcmd_run(ctx, argv) - - -# -# Step functions for examining exit codes. -# - - -def runcmd_exit_code_is_zero(ctx): - runcmd_exit_code_is(ctx, exit=0) - - -def runcmd_exit_code_is(ctx, exit=None): - assert_eq = globals()["assert_eq"] - assert_eq(runcmd_get_exit_code(ctx), int(exit)) - - -def runcmd_exit_code_is_nonzero(ctx): - runcmd_exit_code_is_not(ctx, exit=0) - - -def runcmd_exit_code_is_not(ctx, exit=None): - assert_ne = globals()["assert_ne"] - assert_ne(runcmd_get_exit_code(ctx), int(exit)) - - -# -# Step functions and helpers for examining output in various ways. -# - - -def runcmd_stdout_is(ctx, text=None): - _runcmd_output_is(runcmd_get_stdout(ctx), text) - - -def runcmd_stdout_isnt(ctx, text=None): - _runcmd_output_isnt(runcmd_get_stdout(ctx), text) - - -def runcmd_stderr_is(ctx, text=None): - _runcmd_output_is(runcmd_get_stderr(ctx), text) - - -def runcmd_stderr_isnt(ctx, text=None): - _runcmd_output_isnt(runcmd_get_stderr(ctx), text) - - -def _runcmd_output_is(actual, wanted): - assert_eq = globals()["assert_eq"] - wanted = bytes(wanted, "utf8").decode("unicode_escape") - logging.debug("_runcmd_output_is:") - logging.debug(f" actual: {actual!r}") - logging.debug(f" wanted: {wanted!r}") - assert_eq(actual, wanted) - - -def _runcmd_output_isnt(actual, wanted): - assert_ne = globals()["assert_ne"] - wanted = bytes(wanted, "utf8").decode("unicode_escape") - logging.debug("_runcmd_output_isnt:") - logging.debug(f" actual: {actual!r}") - logging.debug(f" wanted: {wanted!r}") - assert_ne(actual, wanted) - - -def runcmd_stdout_contains(ctx, text=None): - _runcmd_output_contains(runcmd_get_stdout(ctx), text) - - -def runcmd_stdout_doesnt_contain(ctx, text=None): - _runcmd_output_doesnt_contain(runcmd_get_stdout(ctx), text) - - -def runcmd_stderr_contains(ctx, text=None): - _runcmd_output_contains(runcmd_get_stderr(ctx), text) - - -def runcmd_stderr_doesnt_contain(ctx, text=None): - _runcmd_output_doesnt_contain(runcmd_get_stderr(ctx), text) - - -def _runcmd_output_contains(actual, wanted): - assert_eq = globals()["assert_eq"] - wanted = bytes(wanted, "utf8").decode("unicode_escape") - logging.debug("_runcmd_output_contains:") - logging.debug(f" actual: {actual!r}") - logging.debug(f" wanted: {wanted!r}") - assert_eq(wanted in actual, True) - - -def _runcmd_output_doesnt_contain(actual, wanted): - assert_ne = globals()["assert_ne"] - wanted = bytes(wanted, "utf8").decode("unicode_escape") - logging.debug("_runcmd_output_doesnt_contain:") - logging.debug(f" actual: {actual!r}") - logging.debug(f" wanted: {wanted!r}") - assert_ne(wanted in actual, True) - - -def runcmd_stdout_matches_regex(ctx, regex=None): - _runcmd_output_matches_regex(runcmd_get_stdout(ctx), regex) - - -def runcmd_stdout_doesnt_match_regex(ctx, regex=None): - _runcmd_output_doesnt_match_regex(runcmd_get_stdout(ctx), regex) - - -def runcmd_stderr_matches_regex(ctx, regex=None): - _runcmd_output_matches_regex(runcmd_get_stderr(ctx), regex) - - -def runcmd_stderr_doesnt_match_regex(ctx, regex=None): - _runcmd_output_doesnt_match_regex(runcmd_get_stderr(ctx), regex) - - -def _runcmd_output_matches_regex(actual, regex): - assert_ne = globals()["assert_ne"] - r = re.compile(regex) - m = r.search(actual) - logging.debug("_runcmd_output_matches_regex:") - logging.debug(f" actual: {actual!r}") - logging.debug(f" regex: {regex!r}") - logging.debug(f" match: {m}") - assert_ne(m, None) - - -def _runcmd_output_doesnt_match_regex(actual, regex): - assert_eq = globals()["assert_eq"] - r = re.compile(regex) - m = r.search(actual) - logging.debug("_runcmd_output_doesnt_match_regex:") - logging.debug(f" actual: {actual!r}") - logging.debug(f" regex: {regex!r}") - logging.debug(f" match: {m}") - assert_eq(m, None) diff --git a/subplot/runcmd.yaml b/subplot/runcmd.yaml deleted file mode 100644 index 48dde90..0000000 --- a/subplot/runcmd.yaml +++ /dev/null @@ -1,83 +0,0 @@ -# Steps to run commands. - -- given: helper script {filename} for runcmd - function: runcmd_helper_script - -- given: srcdir is in the PATH - function: runcmd_helper_srcdir_path - -- when: I run (?P\S+)(?P.*) - regex: true - function: runcmd_step - -- when: I try to run (?P\S+)(?P.*) - regex: true - function: runcmd_try_to_run - -# Steps to examine exit code of latest command. - -- then: exit code is {exit} - function: runcmd_exit_code_is - -- then: exit code is not {exit} - function: runcmd_exit_code_is_not - -- then: command is successful - function: runcmd_exit_code_is_zero - -- then: command fails - function: runcmd_exit_code_is_nonzero - -# Steps to examine stdout/stderr for exact content. - -- then: stdout is exactly "(?P.*)" - regex: true - function: runcmd_stdout_is - -- then: "stdout isn't exactly \"(?P.*)\"" - regex: true - function: runcmd_stdout_isnt - -- then: stderr is exactly "(?P.*)" - regex: true - function: runcmd_stderr_is - -- then: "stderr isn't exactly \"(?P.*)\"" - regex: true - function: runcmd_stderr_isnt - -# Steps to examine stdout/stderr for sub-strings. - -- then: stdout contains "(?P.*)" - regex: true - function: runcmd_stdout_contains - -- then: "stdout doesn't contain \"(?P.*)\"" - regex: true - function: runcmd_stdout_doesnt_contain - -- then: stderr contains "(?P.*)" - regex: true - function: runcmd_stderr_contains - -- then: "stderr doesn't contain \"(?P.*)\"" - regex: true - function: runcmd_stderr_doesnt_contain - -# Steps to match stdout/stderr against regular expressions. - -- then: stdout matches regex (?P.*) - regex: true - function: runcmd_stdout_matches_regex - -- then: stdout doesn't match regex (?P.*) - regex: true - function: runcmd_stdout_doesnt_match_regex - -- then: stderr matches regex (?P.*) - regex: true - function: runcmd_stderr_matches_regex - -- then: stderr doesn't match regex (?P.*) - regex: true - function: runcmd_stderr_doesnt_match_regex diff --git a/subplot/vendored/files.py b/subplot/vendored/files.py new file mode 100644 index 0000000..dd5b9f8 --- /dev/null +++ b/subplot/vendored/files.py @@ -0,0 +1,194 @@ +import logging +import os +import re +import shutil +import time + + +def files_create_from_embedded(ctx, filename=None): + files_make_directory(ctx, path=os.path.dirname(filename) or ".") + files_create_from_embedded_with_other_name( + ctx, filename_on_disk=filename, embedded_filename=filename + ) + + +def files_create_from_embedded_with_other_name( + ctx, filename_on_disk=None, embedded_filename=None +): + get_file = globals()["get_file"] + + files_make_directory(ctx, path=os.path.dirname(filename_on_disk) or ".") + with open(filename_on_disk, "wb") as f: + f.write(get_file(embedded_filename)) + + +def files_create_from_text(ctx, filename=None, text=None): + files_make_directory(ctx, path=os.path.dirname(filename) or ".") + with open(filename, "w") as f: + f.write(text) + + +def files_make_directory(ctx, path=None): + path = "./" + path + if not os.path.exists(path): + os.makedirs(path) + + +def files_remove_directory(ctx, path=None): + path = "./" + path + shutil.rmtree(path) + + +def files_file_exists(ctx, filename=None): + assert_eq = globals()["assert_eq"] + assert_eq(os.path.exists(filename), True) + + +def files_file_does_not_exist(ctx, filename=None): + assert_eq = globals()["assert_eq"] + assert_eq(os.path.exists(filename), False) + + +def files_directory_exists(ctx, path=None): + assert_eq = globals()["assert_eq"] + assert_eq(os.path.isdir(path), True) + + +def files_directory_does_not_exist(ctx, path=None): + assert_eq = globals()["assert_eq"] + assert_eq(os.path.isdir(path), False) + + +def files_directory_is_empty(ctx, path=None): + assert_eq = globals()["assert_eq"] + assert_eq(os.listdir(path), []) + + +def files_directory_is_not_empty(ctx, path=None): + assert_ne = globals()["assert_ne"] + assert_ne(os.listdir(path), False) + + +def files_only_these_exist(ctx, filenames=None): + assert_eq = globals()["assert_eq"] + filenames = filenames.replace(",", "").split() + assert_eq(set(os.listdir(".")), set(filenames)) + + +def files_file_contains(ctx, filename=None, data=None): + assert_eq = globals()["assert_eq"] + with open(filename, "rb") as f: + actual = f.read() + actual = actual.decode("UTF-8") + assert_eq(data in actual, True) + + +def files_file_matches_regex(ctx, filename=None, regex=None): + assert_eq = globals()["assert_eq"] + with open(filename) as f: + content = f.read() + m = re.search(regex, content) + if m is None: + logging.debug(f"files_file_matches_regex: no match") + logging.debug(f" filenamed: {filename}") + logging.debug(f" regex: {regex}") + logging.debug(f" content: {regex}") + logging.debug(f" match: {m}") + assert_eq(bool(m), True) + + +def files_match(ctx, filename1=None, filename2=None): + assert_eq = globals()["assert_eq"] + with open(filename1, "rb") as f: + data1 = f.read() + with open(filename2, "rb") as f: + data2 = f.read() + assert_eq(data1, data2) + + +def files_touch_with_timestamp( + ctx, + filename=None, + year=None, + month=None, + day=None, + hour=None, + minute=None, + second=None, +): + t = ( + int(year), + int(month), + int(day), + int(hour), + int(minute), + int(second), + -1, + -1, + -1, + ) + ts = time.mktime(t) + _files_touch(filename, ts) + + +def files_touch(ctx, filename=None): + _files_touch(filename, None) + + +def _files_touch(filename, ts): + if not os.path.exists(filename): + open(filename, "w").close() + times = None + if ts is not None: + times = (ts, ts) + os.utime(filename, times=times) + + +def files_mtime_is_recent(ctx, filename=None): + st = os.stat(filename) + age = abs(st.st_mtime - time.time()) + assert age < 1.0 + + +def files_mtime_is_ancient(ctx, filename=None): + st = os.stat(filename) + age = abs(st.st_mtime - time.time()) + year = 365 * 24 * 60 * 60 + required = 39 * year + logging.debug(f"ancient? mtime={st.st_mtime} age={age} required={required}") + assert age > required + + +def files_remember_metadata(ctx, filename=None): + meta = _files_remembered(ctx) + meta[filename] = _files_get_metadata(filename) + logging.debug("files_remember_metadata:") + logging.debug(f" meta: {meta}") + logging.debug(f" ctx: {ctx}") + + +# Check that current metadata of a file is as stored in the context. +def files_has_remembered_metadata(ctx, filename=None): + assert_eq = globals()["assert_eq"] + meta = _files_remembered(ctx) + logging.debug("files_has_remembered_metadata:") + logging.debug(f" meta: {meta}") + logging.debug(f" ctx: {ctx}") + assert_eq(meta[filename], _files_get_metadata(filename)) + + +def files_has_different_metadata(ctx, filename=None): + assert_ne = globals()["assert_ne"] + meta = _files_remembered(ctx) + assert_ne(meta[filename], _files_get_metadata(filename)) + + +def _files_remembered(ctx): + ns = ctx.declare("_files") + return ns.get("remembered-metadata", {}) + + +def _files_get_metadata(filename): + st = os.lstat(filename) + keys = ["st_dev", "st_gid", "st_ino", "st_mode", "st_mtime", "st_size", "st_uid"] + return {key: getattr(st, key) for key in keys} diff --git a/subplot/vendored/files.yaml b/subplot/vendored/files.yaml new file mode 100644 index 0000000..f18b8cd --- /dev/null +++ b/subplot/vendored/files.yaml @@ -0,0 +1,83 @@ +- given: file {filename} + function: files_create_from_embedded + types: + filename: file + +- given: file {filename_on_disk} from {embedded_filename} + function: files_create_from_embedded_with_other_name + types: + embedded_filename: file + +- given: file {filename} has modification time {year}-{month}-{day} {hour}:{minute}:{second} + function: files_touch_with_timestamp + +- when: I write "(?P.*)" to file (?P\S+) + regex: true + function: files_create_from_text + +- when: I remember metadata for file {filename} + function: files_remember_metadata + +- when: I touch file {filename} + function: files_touch + +- then: file {filename} exists + function: files_file_exists + +- then: file {filename} does not exist + function: files_file_does_not_exist + +- then: only files (?P.+) exist + function: files_only_these_exist + regex: true + +- then: file (?P\S+) contains "(?P.*)" + regex: true + function: files_file_contains + +- then: file (?P\S+) matches regex /(?P.*)/ + regex: true + function: files_file_matches_regex + +- then: file (?P\S+) matches regex "(?P.*)" + regex: true + function: files_file_matches_regex + +- then: files {filename1} and {filename2} match + function: files_match + +- then: file {filename} has same metadata as before + function: files_has_remembered_metadata + +- then: file {filename} has different metadata from before + function: files_has_different_metadata + +- then: file {filename} has changed from before + function: files_has_different_metadata + +- then: file {filename} has a very recent modification time + function: files_mtime_is_recent + +- then: file {filename} has a very old modification time + function: files_mtime_is_ancient + +- given: a directory {path} + function: files_make_directory + +- when: I create directory {path} + function: files_make_directory + +- when: I remove directory {path} + function: files_remove_directory + +- then: directory {path} exists + function: files_directory_exists + +- then: directory {path} does not exist + function: files_directory_does_not_exist + +- then: directory {path} is empty + function: files_directory_is_empty + +- then: directory {path} is not empty + function: files_directory_is_not_empty diff --git a/subplot/vendored/runcmd.py b/subplot/vendored/runcmd.py new file mode 100644 index 0000000..a2564c6 --- /dev/null +++ b/subplot/vendored/runcmd.py @@ -0,0 +1,252 @@ +import logging +import os +import re +import shlex +import subprocess + + +# +# Helper functions. +# + +# Get exit code or other stored data about the latest command run by +# runcmd_run. + + +def _runcmd_get(ctx, name): + ns = ctx.declare("_runcmd") + return ns[name] + + +def runcmd_get_exit_code(ctx): + return _runcmd_get(ctx, "exit") + + +def runcmd_get_stdout(ctx): + return _runcmd_get(ctx, "stdout") + + +def runcmd_get_stdout_raw(ctx): + return _runcmd_get(ctx, "stdout.raw") + + +def runcmd_get_stderr(ctx): + return _runcmd_get(ctx, "stderr") + + +def runcmd_get_stderr_raw(ctx): + return _runcmd_get(ctx, "stderr.raw") + + +def runcmd_get_argv(ctx): + return _runcmd_get(ctx, "argv") + + +# Run a command, given an argv and other arguments for subprocess.Popen. +# +# This is meant to be a helper function, not bound directly to a step. The +# stdout, stderr, and exit code are stored in the "_runcmd" namespace in the +# ctx context. +def runcmd_run(ctx, argv, **kwargs): + ns = ctx.declare("_runcmd") + + # The Subplot Python template empties os.environ at startup, modulo a small + # number of variables with carefully chosen values. Here, we don't need to + # care about what those variables are, but we do need to not overwrite + # them, so we just add anything in the env keyword argument, if any, to + # os.environ. + env = dict(os.environ) + for key, arg in kwargs.pop("env", {}).items(): + env[key] = arg + + pp = ns.get("path-prefix") + if pp: + env["PATH"] = pp + ":" + env["PATH"] + + logging.debug(f"runcmd_run") + logging.debug(f" argv: {argv}") + logging.debug(f" env: {env}") + p = subprocess.Popen( + argv, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env, **kwargs + ) + stdout, stderr = p.communicate("") + ns["argv"] = argv + ns["stdout.raw"] = stdout + ns["stderr.raw"] = stderr + ns["stdout"] = stdout.decode("utf-8") + ns["stderr"] = stderr.decode("utf-8") + ns["exit"] = p.returncode + logging.debug(f" ctx: {ctx}") + logging.debug(f" ns: {ns}") + + +# Step: prepend srcdir to PATH whenever runcmd runs a command. +def runcmd_helper_srcdir_path(ctx): + srcdir = globals()["srcdir"] + runcmd_prepend_to_path(ctx, srcdir) + + +# Step: This creates a helper script. +def runcmd_helper_script(ctx, filename=None): + get_file = globals()["get_file"] + with open(filename, "wb") as f: + f.write(get_file(filename)) + + +# +# Step functions for running commands. +# + + +def runcmd_prepend_to_path(ctx, dirname=None): + ns = ctx.declare("_runcmd") + pp = ns.get("path-prefix", "") + if pp: + pp = f"{pp}:{dirname}" + else: + pp = dirname + ns["path-prefix"] = pp + + +def runcmd_step(ctx, argv0=None, args=None): + runcmd_try_to_run(ctx, argv0=argv0, args=args) + runcmd_exit_code_is_zero(ctx) + + +def runcmd_try_to_run(ctx, argv0=None, args=None): + argv = [shlex.quote(argv0)] + shlex.split(args) + runcmd_run(ctx, argv) + + +# +# Step functions for examining exit codes. +# + + +def runcmd_exit_code_is_zero(ctx): + runcmd_exit_code_is(ctx, exit=0) + + +def runcmd_exit_code_is(ctx, exit=None): + assert_eq = globals()["assert_eq"] + assert_eq(runcmd_get_exit_code(ctx), int(exit)) + + +def runcmd_exit_code_is_nonzero(ctx): + runcmd_exit_code_is_not(ctx, exit=0) + + +def runcmd_exit_code_is_not(ctx, exit=None): + assert_ne = globals()["assert_ne"] + assert_ne(runcmd_get_exit_code(ctx), int(exit)) + + +# +# Step functions and helpers for examining output in various ways. +# + + +def runcmd_stdout_is(ctx, text=None): + _runcmd_output_is(runcmd_get_stdout(ctx), text) + + +def runcmd_stdout_isnt(ctx, text=None): + _runcmd_output_isnt(runcmd_get_stdout(ctx), text) + + +def runcmd_stderr_is(ctx, text=None): + _runcmd_output_is(runcmd_get_stderr(ctx), text) + + +def runcmd_stderr_isnt(ctx, text=None): + _runcmd_output_isnt(runcmd_get_stderr(ctx), text) + + +def _runcmd_output_is(actual, wanted): + assert_eq = globals()["assert_eq"] + wanted = bytes(wanted, "utf8").decode("unicode_escape") + logging.debug("_runcmd_output_is:") + logging.debug(f" actual: {actual!r}") + logging.debug(f" wanted: {wanted!r}") + assert_eq(actual, wanted) + + +def _runcmd_output_isnt(actual, wanted): + assert_ne = globals()["assert_ne"] + wanted = bytes(wanted, "utf8").decode("unicode_escape") + logging.debug("_runcmd_output_isnt:") + logging.debug(f" actual: {actual!r}") + logging.debug(f" wanted: {wanted!r}") + assert_ne(actual, wanted) + + +def runcmd_stdout_contains(ctx, text=None): + _runcmd_output_contains(runcmd_get_stdout(ctx), text) + + +def runcmd_stdout_doesnt_contain(ctx, text=None): + _runcmd_output_doesnt_contain(runcmd_get_stdout(ctx), text) + + +def runcmd_stderr_contains(ctx, text=None): + _runcmd_output_contains(runcmd_get_stderr(ctx), text) + + +def runcmd_stderr_doesnt_contain(ctx, text=None): + _runcmd_output_doesnt_contain(runcmd_get_stderr(ctx), text) + + +def _runcmd_output_contains(actual, wanted): + assert_eq = globals()["assert_eq"] + wanted = bytes(wanted, "utf8").decode("unicode_escape") + logging.debug("_runcmd_output_contains:") + logging.debug(f" actual: {actual!r}") + logging.debug(f" wanted: {wanted!r}") + assert_eq(wanted in actual, True) + + +def _runcmd_output_doesnt_contain(actual, wanted): + assert_ne = globals()["assert_ne"] + wanted = bytes(wanted, "utf8").decode("unicode_escape") + logging.debug("_runcmd_output_doesnt_contain:") + logging.debug(f" actual: {actual!r}") + logging.debug(f" wanted: {wanted!r}") + assert_ne(wanted in actual, True) + + +def runcmd_stdout_matches_regex(ctx, regex=None): + _runcmd_output_matches_regex(runcmd_get_stdout(ctx), regex) + + +def runcmd_stdout_doesnt_match_regex(ctx, regex=None): + _runcmd_output_doesnt_match_regex(runcmd_get_stdout(ctx), regex) + + +def runcmd_stderr_matches_regex(ctx, regex=None): + _runcmd_output_matches_regex(runcmd_get_stderr(ctx), regex) + + +def runcmd_stderr_doesnt_match_regex(ctx, regex=None): + _runcmd_output_doesnt_match_regex(runcmd_get_stderr(ctx), regex) + + +def _runcmd_output_matches_regex(actual, regex): + assert_ne = globals()["assert_ne"] + r = re.compile(regex) + m = r.search(actual) + logging.debug("_runcmd_output_matches_regex:") + logging.debug(f" actual: {actual!r}") + logging.debug(f" regex: {regex!r}") + logging.debug(f" match: {m}") + assert_ne(m, None) + + +def _runcmd_output_doesnt_match_regex(actual, regex): + assert_eq = globals()["assert_eq"] + r = re.compile(regex) + m = r.search(actual) + logging.debug("_runcmd_output_doesnt_match_regex:") + logging.debug(f" actual: {actual!r}") + logging.debug(f" regex: {regex!r}") + logging.debug(f" match: {m}") + assert_eq(m, None) diff --git a/subplot/vendored/runcmd.yaml b/subplot/vendored/runcmd.yaml new file mode 100644 index 0000000..48dde90 --- /dev/null +++ b/subplot/vendored/runcmd.yaml @@ -0,0 +1,83 @@ +# Steps to run commands. + +- given: helper script {filename} for runcmd + function: runcmd_helper_script + +- given: srcdir is in the PATH + function: runcmd_helper_srcdir_path + +- when: I run (?P\S+)(?P.*) + regex: true + function: runcmd_step + +- when: I try to run (?P\S+)(?P.*) + regex: true + function: runcmd_try_to_run + +# Steps to examine exit code of latest command. + +- then: exit code is {exit} + function: runcmd_exit_code_is + +- then: exit code is not {exit} + function: runcmd_exit_code_is_not + +- then: command is successful + function: runcmd_exit_code_is_zero + +- then: command fails + function: runcmd_exit_code_is_nonzero + +# Steps to examine stdout/stderr for exact content. + +- then: stdout is exactly "(?P.*)" + regex: true + function: runcmd_stdout_is + +- then: "stdout isn't exactly \"(?P.*)\"" + regex: true + function: runcmd_stdout_isnt + +- then: stderr is exactly "(?P.*)" + regex: true + function: runcmd_stderr_is + +- then: "stderr isn't exactly \"(?P.*)\"" + regex: true + function: runcmd_stderr_isnt + +# Steps to examine stdout/stderr for sub-strings. + +- then: stdout contains "(?P.*)" + regex: true + function: runcmd_stdout_contains + +- then: "stdout doesn't contain \"(?P.*)\"" + regex: true + function: runcmd_stdout_doesnt_contain + +- then: stderr contains "(?P.*)" + regex: true + function: runcmd_stderr_contains + +- then: "stderr doesn't contain \"(?P.*)\"" + regex: true + function: runcmd_stderr_doesnt_contain + +# Steps to match stdout/stderr against regular expressions. + +- then: stdout matches regex (?P.*) + regex: true + function: runcmd_stdout_matches_regex + +- then: stdout doesn't match regex (?P.*) + regex: true + function: runcmd_stdout_doesnt_match_regex + +- then: stderr matches regex (?P.*) + regex: true + function: runcmd_stderr_matches_regex + +- then: stderr doesn't match regex (?P.*) + regex: true + function: runcmd_stderr_doesnt_match_regex -- cgit v1.2.1