From 0fff5bb322aac6f57b242bcb8f96f492bf326127 Mon Sep 17 00:00:00 2001 From: Lars Wirzenius Date: Mon, 28 Feb 2022 15:29:59 +0200 Subject: implement function, add generated test program Sponsored-by: author --- hagrid-subplot.md | 4 +- subplot.py | 28 +- test.py | 941 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 966 insertions(+), 7 deletions(-) create mode 100644 test.py diff --git a/hagrid-subplot.md b/hagrid-subplot.md index 8c4b3f3..aa7d203 100644 --- a/hagrid-subplot.md +++ b/hagrid-subplot.md @@ -37,12 +37,12 @@ given a running Hagrid system given file well-known.pgp when I make request GET /vks/v1/by-fingerprint/EXAMPLEFP then response HTTP status code is 200 -then response has header content-type: image/jpeg +then response has header content-type: application/octet-stream then response body matches file well-known.pgp ~~~ ~~~{#well-known.pgp .file} -FIXME this should be the well-known cert in ASCII armor +certificate ~~~ ### We get an error if using lower case hexadecimal diff --git a/subplot.py b/subplot.py index 850864d..b015c42 100644 --- a/subplot.py +++ b/subplot.py @@ -1,18 +1,36 @@ +import logging +import requests +import urllib3 + +urllib3.disable_warnings() + + def nop(ctx): - pass + ctx["base"] = "https://web" def get(ctx, path=None): - pass + r = requests.get(f"{ctx['base']}{path}", verify=False) + ctx["http.status"] = r.status_code + ctx["http.headers"] = dict(r.headers) + ctx["http.body"] = r.text + logging.debug(f"status: {r.status_code}") + logging.debug(f"headers: {r.headers}") + logging.debug(f"body: {r.text}") def status_code(ctx, code=None): - pass + assert int(ctx["http.status"]) == int(code) def has_header(ctx, header=None, value=None): - pass + logging.debug(f"wanted header: {header!r}") + logging.debug(f"wanted value: {value}") + assert header in ctx["http.headers"] + assert ctx["http.headers"][header] == value def body_matches_file(ctx, filename=None): - pass + with open(filename) as f: + data = f.read() + assert data == ctx["http.body"] diff --git a/test.py b/test.py new file mode 100644 index 0000000..36ce928 --- /dev/null +++ b/test.py @@ -0,0 +1,941 @@ +############################################################################# +# Functions that implement steps. + + +#---------------------------------------------------------------------------- +# This code comes from: subplot.py + +import logging +import requests +import urllib3 + +urllib3.disable_warnings() + + +def nop(ctx): + ctx["base"] = "https://web" + + +def get(ctx, path=None): + r = requests.get(f"{ctx['base']}{path}", verify=False) + ctx["http.status"] = r.status_code + ctx["http.headers"] = dict(r.headers) + ctx["http.body"] = r.text + logging.debug(f"status: {r.status_code}") + logging.debug(f"headers: {r.headers}") + logging.debug(f"body: {r.text}") + + +def status_code(ctx, code=None): + assert int(ctx["http.status"]) == int(code) + + +def has_header(ctx, header=None, value=None): + logging.debug(f"wanted header: {header!r}") + logging.debug(f"wanted value: {value}") + assert header in ctx["http.headers"] + assert ctx["http.headers"][header] == value + + +def body_matches_file(ctx, filename=None): + with open(filename) as f: + data = f.read() + assert data == ctx["http.body"] + + +#---------------------------------------------------------------------------- +# This code comes from: lib/files.py + +from time import strptime + +import logging +import os +import re +import shutil +import time + + +def files_create_from_embedded(ctx, embedded_file=None): + files_make_directory(ctx, path=os.path.dirname(embedded_file) or ".") + files_create_from_embedded_with_other_name( + ctx, filename_on_disk=embedded_file, embedded_file=embedded_file + ) + + +def files_create_from_embedded_with_other_name( + ctx, filename_on_disk=None, embedded_file=None +): + get_file = globals()["get_file"] + + files_make_directory(ctx, path=os.path.dirname(filename_on_disk) or ".") + with open(filename_on_disk, "wb") as f: + f.write(get_file(embedded_file)) + + +def files_create_from_text(ctx, filename=None, text=None): + files_make_directory(ctx, path=os.path.dirname(filename) or ".") + with open(filename, "w") as f: + f.write(text) + + +def files_make_directory(ctx, path=None): + path = "./" + path + if not os.path.exists(path): + os.makedirs(path) + + +def files_remove_directory(ctx, path=None): + path = "./" + path + shutil.rmtree(path) + + +def files_file_exists(ctx, filename=None): + assert_eq = globals()["assert_eq"] + assert_eq(os.path.exists(filename), True) + + +def files_file_does_not_exist(ctx, filename=None): + assert_eq = globals()["assert_eq"] + assert_eq(os.path.exists(filename), False) + + +def files_directory_exists(ctx, path=None): + assert_eq = globals()["assert_eq"] + assert_eq(os.path.isdir(path), True) + + +def files_directory_does_not_exist(ctx, path=None): + assert_eq = globals()["assert_eq"] + assert_eq(os.path.isdir(path), False) + + +def files_directory_is_empty(ctx, path=None): + assert_eq = globals()["assert_eq"] + assert_eq(os.listdir(path), []) + + +def files_directory_is_not_empty(ctx, path=None): + assert_ne = globals()["assert_ne"] + assert_ne(os.listdir(path), False) + + +def files_only_these_exist(ctx, filenames=None): + assert_eq = globals()["assert_eq"] + filenames = filenames.replace(",", "").split() + assert_eq(set(os.listdir(".")), set(filenames)) + + +def files_file_contains(ctx, filename=None, data=None): + assert_eq = globals()["assert_eq"] + with open(filename, "rb") as f: + actual = f.read() + actual = actual.decode("UTF-8") + assert_eq(data in actual, True) + + +def files_file_matches_regex(ctx, filename=None, regex=None): + assert_eq = globals()["assert_eq"] + with open(filename) as f: + content = f.read() + m = re.search(regex, content) + if m is None: + logging.debug("files_file_matches_regex: no match") + logging.debug(f" filenamed: {filename}") + logging.debug(f" regex: {regex}") + logging.debug(f" content: {content}") + logging.debug(f" match: {m}") + assert_eq(bool(m), True) + + +def files_match(ctx, filename1=None, filename2=None): + assert_eq = globals()["assert_eq"] + with open(filename1, "rb") as f: + data1 = f.read() + with open(filename2, "rb") as f: + data2 = f.read() + assert_eq(data1, data2) + + +def files_touch_with_timestamp(ctx, filename=None, mtime=None): + t = strptime(mtime, "%Y-%m-%d %H:%M:%S") + ts = time.mktime(t) + _files_touch(filename, ts) + + +def files_touch(ctx, filename=None): + _files_touch(filename, None) + + +def _files_touch(filename, ts): + if not os.path.exists(filename): + open(filename, "w").close() + times = None + if ts is not None: + times = (ts, ts) + os.utime(filename, times=times) + + +def files_mtime_is_recent(ctx, filename=None): + st = os.stat(filename) + age = abs(st.st_mtime - time.time()) + assert age < 1.0 + + +def files_mtime_is_ancient(ctx, filename=None): + st = os.stat(filename) + age = abs(st.st_mtime - time.time()) + year = 365 * 24 * 60 * 60 + required = 39 * year + logging.debug(f"ancient? mtime={st.st_mtime} age={age} required={required}") + assert age > required + + +def files_remember_metadata(ctx, filename=None): + log_value = globals()["log_value"] + + meta = _files_remembered(ctx) + meta[filename] = _files_get_metadata(filename) + logging.debug("files_remember_metadata:") + log_value("meta", 1, meta) + log_value("ctx", 1, ctx.as_dict()) + + +# Check that current metadata of a file is as stored in the context. +def files_has_remembered_metadata(ctx, filename=None): + assert_dict_eq = globals()["assert_dict_eq"] + log_value = globals()["log_value"] + + meta = _files_remembered(ctx) + logging.debug("files_has_remembered_metadata:") + log_value("meta", 1, meta) + log_value("ctx", 1, ctx.as_dict()) + + assert_dict_eq(meta[filename], _files_get_metadata(filename)) + + +def files_has_different_metadata(ctx, filename=None): + assert_ne = globals()["assert_ne"] + meta = _files_remembered(ctx) + assert_ne(meta[filename], _files_get_metadata(filename)) + + +def _files_remembered(ctx): + ns = ctx.declare("_files") + return ns.get("remembered-metadata", {}) + + +def _files_get_metadata(filename): + st = os.lstat(filename) + keys = ["st_dev", "st_gid", "st_ino", "st_mode", "st_mtime", "st_size", "st_uid"] + return {key: getattr(st, key) for key in keys} + + + + +############################################################################# +# Scaffolding for generated test program. + +# import logging +import re + + +# Store context between steps. +class Context: + def __init__(self): + self._vars = {} + self._ns = {} + + def as_dict(self): + return dict(self._vars) + + def get(self, key, default=None): + return self._vars.get(key, default) + + def __getitem__(self, key): + return self._vars[key] + + def __setitem__(self, key, value): + # logging.debug("Context: key {!r} set to {!r}".format(key, value)) + self._vars[key] = value + + def keys(self): + return self._vars.keys() + + def __contains__(self, key): + return key in self._vars + + def __delitem__(self, key): + del self._vars[key] + + def __repr__(self): + return repr({"vars": self._vars, "namespaces": self._ns}) + + def declare(self, name): + if name not in self._ns: + self._ns[name] = NameSpace(name) + return self._ns[name] + + def remember_value(self, name, value): + ns = self.declare("_values") + if name in ns: + raise KeyError(name) + ns[name] = value + + def recall_value(self, name): + ns = self.declare("_values") + if name not in ns: + raise KeyError(name) + return ns[name] + + def expand_values(self, pattern): + parts = [] + while pattern: + m = re.search(r"(?\S*)\}", pattern) + if not m: + parts.append(pattern) + break + name = m.group("name") + if not name: + raise KeyError("empty name in expansion") + value = self.recall_value(name) + parts.append(value) + pattern = pattern[m.end() :] + return "".join(parts) + + +class NameSpace: + def __init__(self, name): + self.name = name + self._dict = {} + + def as_dict(self): + return dict(self._dict) + + def get(self, key, default=None): + if key not in self._dict: + if default is None: + return None + self._dict[key] = default + return self._dict[key] + + def __setitem__(self, key, value): + self._dict[key] = value + + def __getitem__(self, key): + return self._dict[key] + + def keys(self): + return self._dict.keys() + + def __contains__(self, key): + return key in self._dict + + def __delitem__(self, key): + del self._dict[key] + + def __repr__(self): + return repr(self._dict) + +# Decode a base64 encoded string. Result is binary or unicode string. + + +import base64 + + +def decode_bytes(s): + return base64.b64decode(s) + + +def decode_str(s): + return base64.b64decode(s).decode() + +# Retrieve an embedded test data file using filename. + + +class Files: + def __init__(self): + self._files = {} + + def set(self, filename, content): + self._files[filename] = content + + def get(self, filename): + return self._files[filename] + + +_files = Files() + + +def store_file(filename, content): + _files.set(filename, content) + + +def get_file(filename): + return _files.get(filename) + +# Check two values for equality and give error if they are not equal +def assert_eq(a, b): + assert a == b, "expected %r == %r" % (a, b) + + +# Check two values for inequality and give error if they are equal +def assert_ne(a, b): + assert a != b, "expected %r != %r" % (a, b) + + +# Check that two dict values are equal. +def _assert_dict_eq(a, b): + for key in a: + assert key in b, f"exected {key} in both dicts" + av = a[key] + bv = b[key] + assert_eq(type(av), type(bv)) + if isinstance(av, list): + _assert_list_eq(av, bv) + elif isinstance(av, dict): + _assert_dict_eq(av, bv) + else: + assert_eq(av, bv) + for key in b: + assert key in a, f"exected {key} in both dicts" + + +# Check that two list values are equal +def _assert_list_eq(a, b): + assert_eq(len(a), len(b)) + for (av, bv) in zip(a, b): + assert_eq(type(av), type(bv)) + if isinstance(av, list): + _assert_list_eq(av, bv) + elif isinstance(av, dict): + _assert_dict_eq(av, bv) + else: + assert_eq(av, bv) + + +# Recursively check two dictionaries are equal +def assert_dict_eq(a, b): + assert isinstance(a, dict) + assert isinstance(b, dict) + _assert_dict_eq(a, b) + +import logging +import os +import tempfile + + +############################################################################# +# Code to implement the scenarios. + + +class Step: + def __init__(self): + self._kind = None + self._text = None + self._args = {} + self._function = None + self._cleanup = None + + def set_kind(self, kind): + self._kind = kind + + def set_text(self, text): + self._text = text + + def set_arg(self, name, value): + self._args[name] = value + + def set_function(self, function): + self._function = function + + def set_cleanup(self, cleanup): + self._cleanup = cleanup + + def do(self, ctx): + print(" step: {} {}".format(self._kind, self._text)) + logging.info("step: {} {}".format(self._kind, self._text)) + self._function(ctx, **self._args) + + def cleanup(self, ctx): + if self._cleanup: + print(" cleanup: {} {}".format(self._kind, self._text)) + logging.info("cleanup: {} {}".format(self._kind, self._text)) + self._cleanup(ctx, **self._args) + + +class Scenario: + def __init__(self, ctx): + self._title = None + self._steps = [] + self._ctx = ctx + self._logged_env = False + + def get_title(self): + return self._title + + def set_title(self, title): + self._title = title + + def append_step(self, step): + self._steps.append(step) + + def run(self, datadir, extra_env): + print("scenario: {}".format(self._title)) + logging.info("Scenario: {}".format(self._title)) + + scendir = tempfile.mkdtemp(dir=datadir) + os.chdir(scendir) + self._set_environment_variables_to(scendir, extra_env) + + done = [] + ctx = self._ctx + try: + for step in self._steps: + step.do(ctx) + done.append(step) + except Exception as e: + logging.error(str(e), exc_info=True) + for step in reversed(done): + step.cleanup(ctx) + raise + for step in reversed(done): + step.cleanup(ctx) + + def _set_environment_variables_to(self, scendir, extra_env): + log_value = globals()["log_value"] + + overrides = { + "SHELL": "/bin/sh", + "HOME": scendir, + "TMPDIR": scendir, + } + + os.environ.update(overrides) + os.environ.update(extra_env) + if not self._logged_env: + self._logged_env = True + log_value("extra_env", 0, dict(extra_env)) + log_value("os.environ", 0, dict(os.environ)) + +import argparse +import logging +import os +import random +import shutil +import sys +import tarfile +import tempfile + + +class MultilineFormatter(logging.Formatter): + def format(self, record): + s = super().format(record) + lines = list(s.splitlines()) + return lines.pop(0) + "\n".join(" %s" % line for line in lines) + + +def indent(n): + return " " * n + + +def log_value(msg, level, v): + if is_multiline_string(v): + logging.debug(f"{indent(level)}{msg}:") + log_lines(indent(level + 1), v) + elif isinstance(v, dict) and v: + # Only non-empty dictionaries + logging.debug(f"{indent(level)}{msg}:") + for k in sorted(v.keys()): + log_value(f"{k!r}", level + 1, v[k]) + elif isinstance(v, list) and v: + # Only non-empty lists + logging.debug(f"{indent(level)}{msg}:") + for i, x in enumerate(v): + log_value(f"{i}", level + 1, x) + else: + logging.debug(f"{indent(level)}{msg}: {v!r}") + + +def is_multiline_string(v): + if isinstance(v, str) and "\n" in v: + return True + elif isinstance(v, bytes) and b"\n" in v: + return True + else: + return False + + +def log_lines(prefix, v): + if isinstance(v, str): + nl = "\n" + else: + nl = b"\n" + if nl in v: + for line in v.splitlines(keepends=True): + logging.debug(f"{prefix}{line!r}") + else: + logging.debug(f"{prefix}{v!r}") + + +# Remember where we started from. The step functions may need to refer +# to files there. +srcdir = os.getcwd() +print("srcdir", srcdir) + +# Create a new temporary directory and chdir there. This allows step +# functions to create new files in the current working directory +# without having to be so careful. +_datadir = tempfile.mkdtemp() +print("datadir", _datadir) +os.chdir(_datadir) + + +def parse_command_line(): + p = argparse.ArgumentParser() + p.add_argument("--log") + p.add_argument("--env", action="append", default=[]) + p.add_argument("--run-all", "-k", action="store_true") + p.add_argument("--save-on-failure") + p.add_argument("patterns", nargs="*") + return p.parse_args() + + +def setup_logging(args): + if args.log: + fmt = "%(asctime)s %(levelname)s %(message)s" + datefmt = "%Y-%m-%d %H:%M:%S" + formatter = MultilineFormatter(fmt, datefmt) + + filename = os.path.abspath(os.path.join(srcdir, args.log)) + handler = logging.FileHandler(filename) + handler.setFormatter(formatter) + else: + handler = logging.NullHandler() + + logger = logging.getLogger() + logger.addHandler(handler) + logger.setLevel(logging.DEBUG) + + +def save_directory(dirname, tarname): + print("tarname", tarname) + logging.info("Saving {} to {}".format(dirname, tarname)) + tar = tarfile.open(tarname, "w") + tar.add(dirname, arcname="datadir") + tar.close() + + +def main(scenarios): + args = parse_command_line() + setup_logging(args) + logging.info("Test program starts") + + logging.info("patterns: {}".format(args.patterns)) + if len(args.patterns) == 0: + logging.info("Executing all scenarios") + todo = list(scenarios) + random.shuffle(todo) + else: + logging.info("Executing requested scenarios only: {}".format(args.patterns)) + patterns = [arg.lower() for arg in args.patterns] + todo = [ + scen + for scen in scenarios + if any(pattern in scen.get_title().lower() for pattern in patterns) + ] + + extra_env = {} + for env in args.env: + (name, value) = env.split("=", 1) + extra_env[name] = value + + errors = [] + for scen in todo: + try: + scen.run(_datadir, extra_env) + except Exception as e: + logging.error(str(e), exc_info=True) + errors.append((scen, e)) + if args.save_on_failure: + print(args.save_on_failure) + filename = os.path.abspath(os.path.join(srcdir, args.save_on_failure)) + print(filename) + save_directory(_datadir, filename) + if not args.run_all: + raise + + shutil.rmtree(_datadir) + + if errors: + sys.stderr.write(f"ERROR: {len(errors)} scenarios failed\n") + for (scean, e) in errors: + sys.stderr.write(f" - Scenario {scen.get_title()} failed:\n {e}\n") + if args.log: + sys.stderr.write(f"Log file in {args.log}\n") + sys.exit(1) + + print("OK, all scenarios finished successfully") + logging.info("OK, all scenarios finished successfully") + + + +############################################################################# +# Test data files that were embedded in the source document. Base64 +# encoding is used to allow arbitrary data. + + +# well-known.pgp +filename = decode_str('d2VsbC1rbm93bi5wZ3A=') +contents = decode_bytes('Y2VydGlmaWNhdGUK') +store_file(filename, contents) + + + + +############################################################################# +# Classes for individual scenarios. + + +#---------------------------------------------------------------------------- +# Scenario: We can look up an existing certificate +class Scenario_1(): + def __init__(self): + ctx = Context() + self._scenario = Scenario(ctx) + self._scenario.set_title(decode_str('V2UgY2FuIGxvb2sgdXAgYW4gZXhpc3RpbmcgY2VydGlmaWNhdGU=')) + + # Step: a running Hagrid system + step = Step() + step.set_kind('given') + step.set_text(decode_str('YSBydW5uaW5nIEhhZ3JpZCBzeXN0ZW0=')) + step.set_function(nop) + if '': + step.set_cleanup() + self._scenario.append_step(step) + + # Step: file well-known.pgp + step = Step() + step.set_kind('given') + step.set_text(decode_str('ZmlsZSB3ZWxsLWtub3duLnBncA==')) + step.set_function(files_create_from_embedded) + if '': + step.set_cleanup() + self._scenario.append_step(step) + name = decode_str('ZW1iZWRkZWRfZmlsZQ==') + text = decode_str('d2VsbC1rbm93bi5wZ3A=') + step.set_arg(name, text) + + # Step: I make request GET /vks/v1/by-fingerprint/EXAMPLEFP + step = Step() + step.set_kind('when') + step.set_text(decode_str('SSBtYWtlIHJlcXVlc3QgR0VUIC92a3MvdjEvYnktZmluZ2VycHJpbnQvRVhBTVBMRUZQ')) + step.set_function(get) + if '': + step.set_cleanup() + self._scenario.append_step(step) + name = decode_str('cGF0aA==') + text = decode_str('L3Zrcy92MS9ieS1maW5nZXJwcmludC9FWEFNUExFRlA=') + step.set_arg(name, text) + + # Step: response HTTP status code is 200 + step = Step() + step.set_kind('then') + step.set_text(decode_str('cmVzcG9uc2UgSFRUUCBzdGF0dXMgY29kZSBpcyAyMDA=')) + step.set_function(status_code) + if '': + step.set_cleanup() + self._scenario.append_step(step) + name = decode_str('Y29kZQ==') + text = decode_str('MjAw') + step.set_arg(name, text) + + # Step: response has header content-type: application/octet-stream + step = Step() + step.set_kind('then') + step.set_text(decode_str('cmVzcG9uc2UgaGFzIGhlYWRlciBjb250ZW50LXR5cGU6IGFwcGxpY2F0aW9uL29jdGV0LXN0cmVhbQ==')) + step.set_function(has_header) + if '': + step.set_cleanup() + self._scenario.append_step(step) + name = decode_str('aGVhZGVy') + text = decode_str('Y29udGVudC10eXBl') + step.set_arg(name, text) + name = decode_str('dmFsdWU=') + text = decode_str('YXBwbGljYXRpb24vb2N0ZXQtc3RyZWFt') + step.set_arg(name, text) + + # Step: response body matches file well-known.pgp + step = Step() + step.set_kind('then') + step.set_text(decode_str('cmVzcG9uc2UgYm9keSBtYXRjaGVzIGZpbGUgd2VsbC1rbm93bi5wZ3A=')) + step.set_function(body_matches_file) + if '': + step.set_cleanup() + self._scenario.append_step(step) + name = decode_str('ZmlsZW5hbWU=') + text = decode_str('d2VsbC1rbm93bi5wZ3A=') + step.set_arg(name, text) + + + def get_title(self): + return self._scenario.get_title() + + def run(self, datadir, extra_env): + self._scenario.run(datadir, extra_env) + +#---------------------------------------------------------------------------- +# Scenario: We get an error if using lower case hexadecimal +class Scenario_2(): + def __init__(self): + ctx = Context() + self._scenario = Scenario(ctx) + self._scenario.set_title(decode_str('V2UgZ2V0IGFuIGVycm9yIGlmIHVzaW5nIGxvd2VyIGNhc2UgaGV4YWRlY2ltYWw=')) + + # Step: a running Hagrid system + step = Step() + step.set_kind('given') + step.set_text(decode_str('YSBydW5uaW5nIEhhZ3JpZCBzeXN0ZW0=')) + step.set_function(nop) + if '': + step.set_cleanup() + self._scenario.append_step(step) + + # Step: I make request GET /vks/v1/by-fingerprint/EXAMPLEFP + step = Step() + step.set_kind('when') + step.set_text(decode_str('SSBtYWtlIHJlcXVlc3QgR0VUIC92a3MvdjEvYnktZmluZ2VycHJpbnQvRVhBTVBMRUZQ')) + step.set_function(get) + if '': + step.set_cleanup() + self._scenario.append_step(step) + name = decode_str('cGF0aA==') + text = decode_str('L3Zrcy92MS9ieS1maW5nZXJwcmludC9FWEFNUExFRlA=') + step.set_arg(name, text) + + # Step: response HTTP status code is 400 + step = Step() + step.set_kind('then') + step.set_text(decode_str('cmVzcG9uc2UgSFRUUCBzdGF0dXMgY29kZSBpcyA0MDA=')) + step.set_function(status_code) + if '': + step.set_cleanup() + self._scenario.append_step(step) + name = decode_str('Y29kZQ==') + text = decode_str('NDAw') + step.set_arg(name, text) + + + def get_title(self): + return self._scenario.get_title() + + def run(self, datadir, extra_env): + self._scenario.run(datadir, extra_env) + +#---------------------------------------------------------------------------- +# Scenario: We get an error for a malformed fingerprint +class Scenario_3(): + def __init__(self): + ctx = Context() + self._scenario = Scenario(ctx) + self._scenario.set_title(decode_str('V2UgZ2V0IGFuIGVycm9yIGZvciBhIG1hbGZvcm1lZCBmaW5nZXJwcmludA==')) + + # Step: a running Hagrid system + step = Step() + step.set_kind('given') + step.set_text(decode_str('YSBydW5uaW5nIEhhZ3JpZCBzeXN0ZW0=')) + step.set_function(nop) + if '': + step.set_cleanup() + self._scenario.append_step(step) + + # Step: I make request GET /vks/v1/by-fingerprint/IMPOSSIBLE + step = Step() + step.set_kind('when') + step.set_text(decode_str('SSBtYWtlIHJlcXVlc3QgR0VUIC92a3MvdjEvYnktZmluZ2VycHJpbnQvSU1QT1NTSUJMRQ==')) + step.set_function(get) + if '': + step.set_cleanup() + self._scenario.append_step(step) + name = decode_str('cGF0aA==') + text = decode_str('L3Zrcy92MS9ieS1maW5nZXJwcmludC9JTVBPU1NJQkxF') + step.set_arg(name, text) + + # Step: response HTTP status code is 400 + step = Step() + step.set_kind('then') + step.set_text(decode_str('cmVzcG9uc2UgSFRUUCBzdGF0dXMgY29kZSBpcyA0MDA=')) + step.set_function(status_code) + if '': + step.set_cleanup() + self._scenario.append_step(step) + name = decode_str('Y29kZQ==') + text = decode_str('NDAw') + step.set_arg(name, text) + + + def get_title(self): + return self._scenario.get_title() + + def run(self, datadir, extra_env): + self._scenario.run(datadir, extra_env) + +#---------------------------------------------------------------------------- +# Scenario: We get an error for a missing fingerprint +class Scenario_4(): + def __init__(self): + ctx = Context() + self._scenario = Scenario(ctx) + self._scenario.set_title(decode_str('V2UgZ2V0IGFuIGVycm9yIGZvciBhIG1pc3NpbmcgZmluZ2VycHJpbnQ=')) + + # Step: a running Hagrid system + step = Step() + step.set_kind('given') + step.set_text(decode_str('YSBydW5uaW5nIEhhZ3JpZCBzeXN0ZW0=')) + step.set_function(nop) + if '': + step.set_cleanup() + self._scenario.append_step(step) + + # Step: I make request GET /vks/v1/by-fingerprint/010203 + step = Step() + step.set_kind('when') + step.set_text(decode_str('SSBtYWtlIHJlcXVlc3QgR0VUIC92a3MvdjEvYnktZmluZ2VycHJpbnQvMDEwMjAz')) + step.set_function(get) + if '': + step.set_cleanup() + self._scenario.append_step(step) + name = decode_str('cGF0aA==') + text = decode_str('L3Zrcy92MS9ieS1maW5nZXJwcmludC8wMTAyMDM=') + step.set_arg(name, text) + + # Step: response HTTP status code is 400 + step = Step() + step.set_kind('then') + step.set_text(decode_str('cmVzcG9uc2UgSFRUUCBzdGF0dXMgY29kZSBpcyA0MDA=')) + step.set_function(status_code) + if '': + step.set_cleanup() + self._scenario.append_step(step) + name = decode_str('Y29kZQ==') + text = decode_str('NDAw') + step.set_arg(name, text) + + + def get_title(self): + return self._scenario.get_title() + + def run(self, datadir, extra_env): + self._scenario.run(datadir, extra_env) + + +_scenarios = { + Scenario_1(), + Scenario_2(), + Scenario_3(), + Scenario_4(), +} + + +############################################################################# +# Call main function and clean up. +main(_scenarios) -- cgit v1.2.1