############################################################################# # Functions that implement steps. #---------------------------------------------------------------------------- # This code comes from: subplot.py import logging import requests import urllib3 urllib3.disable_warnings() def nop(ctx): ctx["base"] = "https://web" def get(ctx, path=None): r = requests.get(f"{ctx['base']}{path}", verify=False) ctx["http.status"] = r.status_code ctx["http.headers"] = dict(r.headers) ctx["http.body"] = r.text logging.debug(f"status: {r.status_code}") logging.debug(f"headers: {r.headers}") logging.debug(f"body: {r.text}") def status_code(ctx, code=None): assert int(ctx["http.status"]) == int(code) def has_header(ctx, header=None, value=None): logging.debug(f"wanted header: {header!r}") logging.debug(f"wanted value: {value}") assert header in ctx["http.headers"] assert ctx["http.headers"][header] == value def body_matches_file(ctx, filename=None): with open(filename) as f: data = f.read() assert data == ctx["http.body"] #---------------------------------------------------------------------------- # This code comes from: lib/files.py from time import strptime import logging import os import re import shutil import time def files_create_from_embedded(ctx, embedded_file=None): files_make_directory(ctx, path=os.path.dirname(embedded_file) or ".") files_create_from_embedded_with_other_name( ctx, filename_on_disk=embedded_file, embedded_file=embedded_file ) def files_create_from_embedded_with_other_name( ctx, filename_on_disk=None, embedded_file=None ): get_file = globals()["get_file"] files_make_directory(ctx, path=os.path.dirname(filename_on_disk) or ".") with open(filename_on_disk, "wb") as f: f.write(get_file(embedded_file)) def files_create_from_text(ctx, filename=None, text=None): files_make_directory(ctx, path=os.path.dirname(filename) or ".") with open(filename, "w") as f: f.write(text) def files_make_directory(ctx, path=None): path = "./" + path if not os.path.exists(path): os.makedirs(path) def files_remove_directory(ctx, path=None): path = "./" + path shutil.rmtree(path) def files_file_exists(ctx, filename=None): assert_eq = globals()["assert_eq"] assert_eq(os.path.exists(filename), True) def files_file_does_not_exist(ctx, filename=None): assert_eq = globals()["assert_eq"] assert_eq(os.path.exists(filename), False) def files_directory_exists(ctx, path=None): assert_eq = globals()["assert_eq"] assert_eq(os.path.isdir(path), True) def files_directory_does_not_exist(ctx, path=None): assert_eq = globals()["assert_eq"] assert_eq(os.path.isdir(path), False) def files_directory_is_empty(ctx, path=None): assert_eq = globals()["assert_eq"] assert_eq(os.listdir(path), []) def files_directory_is_not_empty(ctx, path=None): assert_ne = globals()["assert_ne"] assert_ne(os.listdir(path), False) def files_only_these_exist(ctx, filenames=None): assert_eq = globals()["assert_eq"] filenames = filenames.replace(",", "").split() assert_eq(set(os.listdir(".")), set(filenames)) def files_file_contains(ctx, filename=None, data=None): assert_eq = globals()["assert_eq"] with open(filename, "rb") as f: actual = f.read() actual = actual.decode("UTF-8") assert_eq(data in actual, True) def files_file_matches_regex(ctx, filename=None, regex=None): assert_eq = globals()["assert_eq"] with open(filename) as f: content = f.read() m = re.search(regex, content) if m is None: logging.debug("files_file_matches_regex: no match") logging.debug(f" filenamed: {filename}") logging.debug(f" regex: {regex}") logging.debug(f" content: {content}") logging.debug(f" match: {m}") assert_eq(bool(m), True) def files_match(ctx, filename1=None, filename2=None): assert_eq = globals()["assert_eq"] with open(filename1, "rb") as f: data1 = f.read() with open(filename2, "rb") as f: data2 = f.read() assert_eq(data1, data2) def files_touch_with_timestamp(ctx, filename=None, mtime=None): t = strptime(mtime, "%Y-%m-%d %H:%M:%S") ts = time.mktime(t) _files_touch(filename, ts) def files_touch(ctx, filename=None): _files_touch(filename, None) def _files_touch(filename, ts): if not os.path.exists(filename): open(filename, "w").close() times = None if ts is not None: times = (ts, ts) os.utime(filename, times=times) def files_mtime_is_recent(ctx, filename=None): st = os.stat(filename) age = abs(st.st_mtime - time.time()) assert age < 1.0 def files_mtime_is_ancient(ctx, filename=None): st = os.stat(filename) age = abs(st.st_mtime - time.time()) year = 365 * 24 * 60 * 60 required = 39 * year logging.debug(f"ancient? mtime={st.st_mtime} age={age} required={required}") assert age > required def files_remember_metadata(ctx, filename=None): log_value = globals()["log_value"] meta = _files_remembered(ctx) meta[filename] = _files_get_metadata(filename) logging.debug("files_remember_metadata:") log_value("meta", 1, meta) log_value("ctx", 1, ctx.as_dict()) # Check that current metadata of a file is as stored in the context. def files_has_remembered_metadata(ctx, filename=None): assert_dict_eq = globals()["assert_dict_eq"] log_value = globals()["log_value"] meta = _files_remembered(ctx) logging.debug("files_has_remembered_metadata:") log_value("meta", 1, meta) log_value("ctx", 1, ctx.as_dict()) assert_dict_eq(meta[filename], _files_get_metadata(filename)) def files_has_different_metadata(ctx, filename=None): assert_ne = globals()["assert_ne"] meta = _files_remembered(ctx) assert_ne(meta[filename], _files_get_metadata(filename)) def _files_remembered(ctx): ns = ctx.declare("_files") return ns.get("remembered-metadata", {}) def _files_get_metadata(filename): st = os.lstat(filename) keys = ["st_dev", "st_gid", "st_ino", "st_mode", "st_mtime", "st_size", "st_uid"] return {key: getattr(st, key) for key in keys} ############################################################################# # Scaffolding for generated test program. # import logging import re # Store context between steps. class Context: def __init__(self): self._vars = {} self._ns = {} def as_dict(self): return dict(self._vars) def get(self, key, default=None): return self._vars.get(key, default) def __getitem__(self, key): return self._vars[key] def __setitem__(self, key, value): # logging.debug("Context: key {!r} set to {!r}".format(key, value)) self._vars[key] = value def keys(self): return self._vars.keys() def __contains__(self, key): return key in self._vars def __delitem__(self, key): del self._vars[key] def __repr__(self): return repr({"vars": self._vars, "namespaces": self._ns}) def declare(self, name): if name not in self._ns: self._ns[name] = NameSpace(name) return self._ns[name] def remember_value(self, name, value): ns = self.declare("_values") if name in ns: raise KeyError(name) ns[name] = value def recall_value(self, name): ns = self.declare("_values") if name not in ns: raise KeyError(name) return ns[name] def expand_values(self, pattern): parts = [] while pattern: m = re.search(r"(?\S*)\}", pattern) if not m: parts.append(pattern) break name = m.group("name") if not name: raise KeyError("empty name in expansion") value = self.recall_value(name) parts.append(value) pattern = pattern[m.end() :] return "".join(parts) class NameSpace: def __init__(self, name): self.name = name self._dict = {} def as_dict(self): return dict(self._dict) def get(self, key, default=None): if key not in self._dict: if default is None: return None self._dict[key] = default return self._dict[key] def __setitem__(self, key, value): self._dict[key] = value def __getitem__(self, key): return self._dict[key] def keys(self): return self._dict.keys() def __contains__(self, key): return key in self._dict def __delitem__(self, key): del self._dict[key] def __repr__(self): return repr(self._dict) # Decode a base64 encoded string. Result is binary or unicode string. import base64 def decode_bytes(s): return base64.b64decode(s) def decode_str(s): return base64.b64decode(s).decode() # Retrieve an embedded test data file using filename. class Files: def __init__(self): self._files = {} def set(self, filename, content): self._files[filename] = content def get(self, filename): return self._files[filename] _files = Files() def store_file(filename, content): _files.set(filename, content) def get_file(filename): return _files.get(filename) # Check two values for equality and give error if they are not equal def assert_eq(a, b): assert a == b, "expected %r == %r" % (a, b) # Check two values for inequality and give error if they are equal def assert_ne(a, b): assert a != b, "expected %r != %r" % (a, b) # Check that two dict values are equal. def _assert_dict_eq(a, b): for key in a: assert key in b, f"exected {key} in both dicts" av = a[key] bv = b[key] assert_eq(type(av), type(bv)) if isinstance(av, list): _assert_list_eq(av, bv) elif isinstance(av, dict): _assert_dict_eq(av, bv) else: assert_eq(av, bv) for key in b: assert key in a, f"exected {key} in both dicts" # Check that two list values are equal def _assert_list_eq(a, b): assert_eq(len(a), len(b)) for (av, bv) in zip(a, b): assert_eq(type(av), type(bv)) if isinstance(av, list): _assert_list_eq(av, bv) elif isinstance(av, dict): _assert_dict_eq(av, bv) else: assert_eq(av, bv) # Recursively check two dictionaries are equal def assert_dict_eq(a, b): assert isinstance(a, dict) assert isinstance(b, dict) _assert_dict_eq(a, b) import logging import os import tempfile ############################################################################# # Code to implement the scenarios. class Step: def __init__(self): self._kind = None self._text = None self._args = {} self._function = None self._cleanup = None def set_kind(self, kind): self._kind = kind def set_text(self, text): self._text = text def set_arg(self, name, value): self._args[name] = value def set_function(self, function): self._function = function def set_cleanup(self, cleanup): self._cleanup = cleanup def do(self, ctx): print(" step: {} {}".format(self._kind, self._text)) logging.info("step: {} {}".format(self._kind, self._text)) self._function(ctx, **self._args) def cleanup(self, ctx): if self._cleanup: print(" cleanup: {} {}".format(self._kind, self._text)) logging.info("cleanup: {} {}".format(self._kind, self._text)) self._cleanup(ctx, **self._args) class Scenario: def __init__(self, ctx): self._title = None self._steps = [] self._ctx = ctx self._logged_env = False def get_title(self): return self._title def set_title(self, title): self._title = title def append_step(self, step): self._steps.append(step) def run(self, datadir, extra_env): print("scenario: {}".format(self._title)) logging.info("Scenario: {}".format(self._title)) scendir = tempfile.mkdtemp(dir=datadir) os.chdir(scendir) self._set_environment_variables_to(scendir, extra_env) done = [] ctx = self._ctx try: for step in self._steps: step.do(ctx) done.append(step) except Exception as e: logging.error(str(e), exc_info=True) for step in reversed(done): step.cleanup(ctx) raise for step in reversed(done): step.cleanup(ctx) def _set_environment_variables_to(self, scendir, extra_env): log_value = globals()["log_value"] overrides = { "SHELL": "/bin/sh", "HOME": scendir, "TMPDIR": scendir, } os.environ.update(overrides) os.environ.update(extra_env) if not self._logged_env: self._logged_env = True log_value("extra_env", 0, dict(extra_env)) log_value("os.environ", 0, dict(os.environ)) import argparse import logging import os import random import shutil import sys import tarfile import tempfile class MultilineFormatter(logging.Formatter): def format(self, record): s = super().format(record) lines = list(s.splitlines()) return lines.pop(0) + "\n".join(" %s" % line for line in lines) def indent(n): return " " * n def log_value(msg, level, v): if is_multiline_string(v): logging.debug(f"{indent(level)}{msg}:") log_lines(indent(level + 1), v) elif isinstance(v, dict) and v: # Only non-empty dictionaries logging.debug(f"{indent(level)}{msg}:") for k in sorted(v.keys()): log_value(f"{k!r}", level + 1, v[k]) elif isinstance(v, list) and v: # Only non-empty lists logging.debug(f"{indent(level)}{msg}:") for i, x in enumerate(v): log_value(f"{i}", level + 1, x) else: logging.debug(f"{indent(level)}{msg}: {v!r}") def is_multiline_string(v): if isinstance(v, str) and "\n" in v: return True elif isinstance(v, bytes) and b"\n" in v: return True else: return False def log_lines(prefix, v): if isinstance(v, str): nl = "\n" else: nl = b"\n" if nl in v: for line in v.splitlines(keepends=True): logging.debug(f"{prefix}{line!r}") else: logging.debug(f"{prefix}{v!r}") # Remember where we started from. The step functions may need to refer # to files there. srcdir = os.getcwd() print("srcdir", srcdir) # Create a new temporary directory and chdir there. This allows step # functions to create new files in the current working directory # without having to be so careful. _datadir = tempfile.mkdtemp() print("datadir", _datadir) os.chdir(_datadir) def parse_command_line(): p = argparse.ArgumentParser() p.add_argument("--log") p.add_argument("--env", action="append", default=[]) p.add_argument("--run-all", "-k", action="store_true") p.add_argument("--save-on-failure") p.add_argument("patterns", nargs="*") return p.parse_args() def setup_logging(args): if args.log: fmt = "%(asctime)s %(levelname)s %(message)s" datefmt = "%Y-%m-%d %H:%M:%S" formatter = MultilineFormatter(fmt, datefmt) filename = os.path.abspath(os.path.join(srcdir, args.log)) handler = logging.FileHandler(filename) handler.setFormatter(formatter) else: handler = logging.NullHandler() logger = logging.getLogger() logger.addHandler(handler) logger.setLevel(logging.DEBUG) def save_directory(dirname, tarname): print("tarname", tarname) logging.info("Saving {} to {}".format(dirname, tarname)) tar = tarfile.open(tarname, "w") tar.add(dirname, arcname="datadir") tar.close() def main(scenarios): args = parse_command_line() setup_logging(args) logging.info("Test program starts") logging.info("patterns: {}".format(args.patterns)) if len(args.patterns) == 0: logging.info("Executing all scenarios") todo = list(scenarios) random.shuffle(todo) else: logging.info("Executing requested scenarios only: {}".format(args.patterns)) patterns = [arg.lower() for arg in args.patterns] todo = [ scen for scen in scenarios if any(pattern in scen.get_title().lower() for pattern in patterns) ] extra_env = {} for env in args.env: (name, value) = env.split("=", 1) extra_env[name] = value errors = [] for scen in todo: try: scen.run(_datadir, extra_env) except Exception as e: logging.error(str(e), exc_info=True) errors.append((scen, e)) if args.save_on_failure: print(args.save_on_failure) filename = os.path.abspath(os.path.join(srcdir, args.save_on_failure)) print(filename) save_directory(_datadir, filename) if not args.run_all: raise shutil.rmtree(_datadir) if errors: sys.stderr.write(f"ERROR: {len(errors)} scenarios failed\n") for (scean, e) in errors: sys.stderr.write(f" - Scenario {scen.get_title()} failed:\n {e}\n") if args.log: sys.stderr.write(f"Log file in {args.log}\n") sys.exit(1) print("OK, all scenarios finished successfully") logging.info("OK, all scenarios finished successfully") ############################################################################# # Test data files that were embedded in the source document. Base64 # encoding is used to allow arbitrary data. # well-known.pgp filename = decode_str('d2VsbC1rbm93bi5wZ3A=') contents = decode_bytes('Y2VydGlmaWNhdGUK') store_file(filename, contents) ############################################################################# # Classes for individual scenarios. #---------------------------------------------------------------------------- # Scenario: We can look up an existing certificate class Scenario_1(): def __init__(self): ctx = Context() self._scenario = Scenario(ctx) self._scenario.set_title(decode_str('V2UgY2FuIGxvb2sgdXAgYW4gZXhpc3RpbmcgY2VydGlmaWNhdGU=')) # Step: a running Hagrid system step = Step() step.set_kind('given') step.set_text(decode_str('YSBydW5uaW5nIEhhZ3JpZCBzeXN0ZW0=')) step.set_function(nop) if '': step.set_cleanup() self._scenario.append_step(step) # Step: file well-known.pgp step = Step() step.set_kind('given') step.set_text(decode_str('ZmlsZSB3ZWxsLWtub3duLnBncA==')) step.set_function(files_create_from_embedded) if '': step.set_cleanup() self._scenario.append_step(step) name = decode_str('ZW1iZWRkZWRfZmlsZQ==') text = decode_str('d2VsbC1rbm93bi5wZ3A=') step.set_arg(name, text) # Step: I make request GET /vks/v1/by-fingerprint/EXAMPLEFP step = Step() step.set_kind('when') step.set_text(decode_str('SSBtYWtlIHJlcXVlc3QgR0VUIC92a3MvdjEvYnktZmluZ2VycHJpbnQvRVhBTVBMRUZQ')) step.set_function(get) if '': step.set_cleanup() self._scenario.append_step(step) name = decode_str('cGF0aA==') text = decode_str('L3Zrcy92MS9ieS1maW5nZXJwcmludC9FWEFNUExFRlA=') step.set_arg(name, text) # Step: response HTTP status code is 200 step = Step() step.set_kind('then') step.set_text(decode_str('cmVzcG9uc2UgSFRUUCBzdGF0dXMgY29kZSBpcyAyMDA=')) step.set_function(status_code) if '': step.set_cleanup() self._scenario.append_step(step) name = decode_str('Y29kZQ==') text = decode_str('MjAw') step.set_arg(name, text) # Step: response has header content-type: application/octet-stream step = Step() step.set_kind('then') step.set_text(decode_str('cmVzcG9uc2UgaGFzIGhlYWRlciBjb250ZW50LXR5cGU6IGFwcGxpY2F0aW9uL29jdGV0LXN0cmVhbQ==')) step.set_function(has_header) if '': step.set_cleanup() self._scenario.append_step(step) name = decode_str('aGVhZGVy') text = decode_str('Y29udGVudC10eXBl') step.set_arg(name, text) name = decode_str('dmFsdWU=') text = decode_str('YXBwbGljYXRpb24vb2N0ZXQtc3RyZWFt') step.set_arg(name, text) # Step: response body matches file well-known.pgp step = Step() step.set_kind('then') step.set_text(decode_str('cmVzcG9uc2UgYm9keSBtYXRjaGVzIGZpbGUgd2VsbC1rbm93bi5wZ3A=')) step.set_function(body_matches_file) if '': step.set_cleanup() self._scenario.append_step(step) name = decode_str('ZmlsZW5hbWU=') text = decode_str('d2VsbC1rbm93bi5wZ3A=') step.set_arg(name, text) def get_title(self): return self._scenario.get_title() def run(self, datadir, extra_env): self._scenario.run(datadir, extra_env) #---------------------------------------------------------------------------- # Scenario: We get an error if using lower case hexadecimal class Scenario_2(): def __init__(self): ctx = Context() self._scenario = Scenario(ctx) self._scenario.set_title(decode_str('V2UgZ2V0IGFuIGVycm9yIGlmIHVzaW5nIGxvd2VyIGNhc2UgaGV4YWRlY2ltYWw=')) # Step: a running Hagrid system step = Step() step.set_kind('given') step.set_text(decode_str('YSBydW5uaW5nIEhhZ3JpZCBzeXN0ZW0=')) step.set_function(nop) if '': step.set_cleanup() self._scenario.append_step(step) # Step: I make request GET /vks/v1/by-fingerprint/EXAMPLEFP step = Step() step.set_kind('when') step.set_text(decode_str('SSBtYWtlIHJlcXVlc3QgR0VUIC92a3MvdjEvYnktZmluZ2VycHJpbnQvRVhBTVBMRUZQ')) step.set_function(get) if '': step.set_cleanup() self._scenario.append_step(step) name = decode_str('cGF0aA==') text = decode_str('L3Zrcy92MS9ieS1maW5nZXJwcmludC9FWEFNUExFRlA=') step.set_arg(name, text) # Step: response HTTP status code is 400 step = Step() step.set_kind('then') step.set_text(decode_str('cmVzcG9uc2UgSFRUUCBzdGF0dXMgY29kZSBpcyA0MDA=')) step.set_function(status_code) if '': step.set_cleanup() self._scenario.append_step(step) name = decode_str('Y29kZQ==') text = decode_str('NDAw') step.set_arg(name, text) def get_title(self): return self._scenario.get_title() def run(self, datadir, extra_env): self._scenario.run(datadir, extra_env) #---------------------------------------------------------------------------- # Scenario: We get an error for a malformed fingerprint class Scenario_3(): def __init__(self): ctx = Context() self._scenario = Scenario(ctx) self._scenario.set_title(decode_str('V2UgZ2V0IGFuIGVycm9yIGZvciBhIG1hbGZvcm1lZCBmaW5nZXJwcmludA==')) # Step: a running Hagrid system step = Step() step.set_kind('given') step.set_text(decode_str('YSBydW5uaW5nIEhhZ3JpZCBzeXN0ZW0=')) step.set_function(nop) if '': step.set_cleanup() self._scenario.append_step(step) # Step: I make request GET /vks/v1/by-fingerprint/IMPOSSIBLE step = Step() step.set_kind('when') step.set_text(decode_str('SSBtYWtlIHJlcXVlc3QgR0VUIC92a3MvdjEvYnktZmluZ2VycHJpbnQvSU1QT1NTSUJMRQ==')) step.set_function(get) if '': step.set_cleanup() self._scenario.append_step(step) name = decode_str('cGF0aA==') text = decode_str('L3Zrcy92MS9ieS1maW5nZXJwcmludC9JTVBPU1NJQkxF') step.set_arg(name, text) # Step: response HTTP status code is 400 step = Step() step.set_kind('then') step.set_text(decode_str('cmVzcG9uc2UgSFRUUCBzdGF0dXMgY29kZSBpcyA0MDA=')) step.set_function(status_code) if '': step.set_cleanup() self._scenario.append_step(step) name = decode_str('Y29kZQ==') text = decode_str('NDAw') step.set_arg(name, text) def get_title(self): return self._scenario.get_title() def run(self, datadir, extra_env): self._scenario.run(datadir, extra_env) #---------------------------------------------------------------------------- # Scenario: We get an error for a missing fingerprint class Scenario_4(): def __init__(self): ctx = Context() self._scenario = Scenario(ctx) self._scenario.set_title(decode_str('V2UgZ2V0IGFuIGVycm9yIGZvciBhIG1pc3NpbmcgZmluZ2VycHJpbnQ=')) # Step: a running Hagrid system step = Step() step.set_kind('given') step.set_text(decode_str('YSBydW5uaW5nIEhhZ3JpZCBzeXN0ZW0=')) step.set_function(nop) if '': step.set_cleanup() self._scenario.append_step(step) # Step: I make request GET /vks/v1/by-fingerprint/010203 step = Step() step.set_kind('when') step.set_text(decode_str('SSBtYWtlIHJlcXVlc3QgR0VUIC92a3MvdjEvYnktZmluZ2VycHJpbnQvMDEwMjAz')) step.set_function(get) if '': step.set_cleanup() self._scenario.append_step(step) name = decode_str('cGF0aA==') text = decode_str('L3Zrcy92MS9ieS1maW5nZXJwcmludC8wMTAyMDM=') step.set_arg(name, text) # Step: response HTTP status code is 400 step = Step() step.set_kind('then') step.set_text(decode_str('cmVzcG9uc2UgSFRUUCBzdGF0dXMgY29kZSBpcyA0MDA=')) step.set_function(status_code) if '': step.set_cleanup() self._scenario.append_step(step) name = decode_str('Y29kZQ==') text = decode_str('NDAw') step.set_arg(name, text) def get_title(self): return self._scenario.get_title() def run(self, datadir, extra_env): self._scenario.run(datadir, extra_env) _scenarios = { Scenario_1(), Scenario_2(), Scenario_3(), Scenario_4(), } ############################################################################# # Call main function and clean up. main(_scenarios)