summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--hagrid-subplot.md4
-rw-r--r--subplot.py28
-rw-r--r--test.py941
3 files changed, 966 insertions, 7 deletions
diff --git a/hagrid-subplot.md b/hagrid-subplot.md
index 8c4b3f3..aa7d203 100644
--- a/hagrid-subplot.md
+++ b/hagrid-subplot.md
@@ -37,12 +37,12 @@ given a running Hagrid system
given file well-known.pgp
when I make request GET /vks/v1/by-fingerprint/EXAMPLEFP
then response HTTP status code is 200
-then response has header content-type: image/jpeg
+then response has header content-type: application/octet-stream
then response body matches file well-known.pgp
~~~
~~~{#well-known.pgp .file}
-FIXME this should be the well-known cert in ASCII armor
+certificate
~~~
### We get an error if using lower case hexadecimal
diff --git a/subplot.py b/subplot.py
index 850864d..b015c42 100644
--- a/subplot.py
+++ b/subplot.py
@@ -1,18 +1,36 @@
+import logging
+import requests
+import urllib3
+
+urllib3.disable_warnings()
+
+
def nop(ctx):
- pass
+ ctx["base"] = "https://web"
def get(ctx, path=None):
- pass
+ r = requests.get(f"{ctx['base']}{path}", verify=False)
+ ctx["http.status"] = r.status_code
+ ctx["http.headers"] = dict(r.headers)
+ ctx["http.body"] = r.text
+ logging.debug(f"status: {r.status_code}")
+ logging.debug(f"headers: {r.headers}")
+ logging.debug(f"body: {r.text}")
def status_code(ctx, code=None):
- pass
+ assert int(ctx["http.status"]) == int(code)
def has_header(ctx, header=None, value=None):
- pass
+ logging.debug(f"wanted header: {header!r}")
+ logging.debug(f"wanted value: {value}")
+ assert header in ctx["http.headers"]
+ assert ctx["http.headers"][header] == value
def body_matches_file(ctx, filename=None):
- pass
+ with open(filename) as f:
+ data = f.read()
+ assert data == ctx["http.body"]
diff --git a/test.py b/test.py
new file mode 100644
index 0000000..36ce928
--- /dev/null
+++ b/test.py
@@ -0,0 +1,941 @@
+#############################################################################
+# Functions that implement steps.
+
+
+#----------------------------------------------------------------------------
+# This code comes from: subplot.py
+
+import logging
+import requests
+import urllib3
+
+urllib3.disable_warnings()
+
+
+def nop(ctx):
+ ctx["base"] = "https://web"
+
+
+def get(ctx, path=None):
+ r = requests.get(f"{ctx['base']}{path}", verify=False)
+ ctx["http.status"] = r.status_code
+ ctx["http.headers"] = dict(r.headers)
+ ctx["http.body"] = r.text
+ logging.debug(f"status: {r.status_code}")
+ logging.debug(f"headers: {r.headers}")
+ logging.debug(f"body: {r.text}")
+
+
+def status_code(ctx, code=None):
+ assert int(ctx["http.status"]) == int(code)
+
+
+def has_header(ctx, header=None, value=None):
+ logging.debug(f"wanted header: {header!r}")
+ logging.debug(f"wanted value: {value}")
+ assert header in ctx["http.headers"]
+ assert ctx["http.headers"][header] == value
+
+
+def body_matches_file(ctx, filename=None):
+ with open(filename) as f:
+ data = f.read()
+ assert data == ctx["http.body"]
+
+
+#----------------------------------------------------------------------------
+# This code comes from: lib/files.py
+
+from time import strptime
+
+import logging
+import os
+import re
+import shutil
+import time
+
+
+def files_create_from_embedded(ctx, embedded_file=None):
+ files_make_directory(ctx, path=os.path.dirname(embedded_file) or ".")
+ files_create_from_embedded_with_other_name(
+ ctx, filename_on_disk=embedded_file, embedded_file=embedded_file
+ )
+
+
+def files_create_from_embedded_with_other_name(
+ ctx, filename_on_disk=None, embedded_file=None
+):
+ get_file = globals()["get_file"]
+
+ files_make_directory(ctx, path=os.path.dirname(filename_on_disk) or ".")
+ with open(filename_on_disk, "wb") as f:
+ f.write(get_file(embedded_file))
+
+
+def files_create_from_text(ctx, filename=None, text=None):
+ files_make_directory(ctx, path=os.path.dirname(filename) or ".")
+ with open(filename, "w") as f:
+ f.write(text)
+
+
+def files_make_directory(ctx, path=None):
+ path = "./" + path
+ if not os.path.exists(path):
+ os.makedirs(path)
+
+
+def files_remove_directory(ctx, path=None):
+ path = "./" + path
+ shutil.rmtree(path)
+
+
+def files_file_exists(ctx, filename=None):
+ assert_eq = globals()["assert_eq"]
+ assert_eq(os.path.exists(filename), True)
+
+
+def files_file_does_not_exist(ctx, filename=None):
+ assert_eq = globals()["assert_eq"]
+ assert_eq(os.path.exists(filename), False)
+
+
+def files_directory_exists(ctx, path=None):
+ assert_eq = globals()["assert_eq"]
+ assert_eq(os.path.isdir(path), True)
+
+
+def files_directory_does_not_exist(ctx, path=None):
+ assert_eq = globals()["assert_eq"]
+ assert_eq(os.path.isdir(path), False)
+
+
+def files_directory_is_empty(ctx, path=None):
+ assert_eq = globals()["assert_eq"]
+ assert_eq(os.listdir(path), [])
+
+
+def files_directory_is_not_empty(ctx, path=None):
+ assert_ne = globals()["assert_ne"]
+ assert_ne(os.listdir(path), False)
+
+
+def files_only_these_exist(ctx, filenames=None):
+ assert_eq = globals()["assert_eq"]
+ filenames = filenames.replace(",", "").split()
+ assert_eq(set(os.listdir(".")), set(filenames))
+
+
+def files_file_contains(ctx, filename=None, data=None):
+ assert_eq = globals()["assert_eq"]
+ with open(filename, "rb") as f:
+ actual = f.read()
+ actual = actual.decode("UTF-8")
+ assert_eq(data in actual, True)
+
+
+def files_file_matches_regex(ctx, filename=None, regex=None):
+ assert_eq = globals()["assert_eq"]
+ with open(filename) as f:
+ content = f.read()
+ m = re.search(regex, content)
+ if m is None:
+ logging.debug("files_file_matches_regex: no match")
+ logging.debug(f" filenamed: {filename}")
+ logging.debug(f" regex: {regex}")
+ logging.debug(f" content: {content}")
+ logging.debug(f" match: {m}")
+ assert_eq(bool(m), True)
+
+
+def files_match(ctx, filename1=None, filename2=None):
+ assert_eq = globals()["assert_eq"]
+ with open(filename1, "rb") as f:
+ data1 = f.read()
+ with open(filename2, "rb") as f:
+ data2 = f.read()
+ assert_eq(data1, data2)
+
+
+def files_touch_with_timestamp(ctx, filename=None, mtime=None):
+ t = strptime(mtime, "%Y-%m-%d %H:%M:%S")
+ ts = time.mktime(t)
+ _files_touch(filename, ts)
+
+
+def files_touch(ctx, filename=None):
+ _files_touch(filename, None)
+
+
+def _files_touch(filename, ts):
+ if not os.path.exists(filename):
+ open(filename, "w").close()
+ times = None
+ if ts is not None:
+ times = (ts, ts)
+ os.utime(filename, times=times)
+
+
+def files_mtime_is_recent(ctx, filename=None):
+ st = os.stat(filename)
+ age = abs(st.st_mtime - time.time())
+ assert age < 1.0
+
+
+def files_mtime_is_ancient(ctx, filename=None):
+ st = os.stat(filename)
+ age = abs(st.st_mtime - time.time())
+ year = 365 * 24 * 60 * 60
+ required = 39 * year
+ logging.debug(f"ancient? mtime={st.st_mtime} age={age} required={required}")
+ assert age > required
+
+
+def files_remember_metadata(ctx, filename=None):
+ log_value = globals()["log_value"]
+
+ meta = _files_remembered(ctx)
+ meta[filename] = _files_get_metadata(filename)
+ logging.debug("files_remember_metadata:")
+ log_value("meta", 1, meta)
+ log_value("ctx", 1, ctx.as_dict())
+
+
+# Check that current metadata of a file is as stored in the context.
+def files_has_remembered_metadata(ctx, filename=None):
+ assert_dict_eq = globals()["assert_dict_eq"]
+ log_value = globals()["log_value"]
+
+ meta = _files_remembered(ctx)
+ logging.debug("files_has_remembered_metadata:")
+ log_value("meta", 1, meta)
+ log_value("ctx", 1, ctx.as_dict())
+
+ assert_dict_eq(meta[filename], _files_get_metadata(filename))
+
+
+def files_has_different_metadata(ctx, filename=None):
+ assert_ne = globals()["assert_ne"]
+ meta = _files_remembered(ctx)
+ assert_ne(meta[filename], _files_get_metadata(filename))
+
+
+def _files_remembered(ctx):
+ ns = ctx.declare("_files")
+ return ns.get("remembered-metadata", {})
+
+
+def _files_get_metadata(filename):
+ st = os.lstat(filename)
+ keys = ["st_dev", "st_gid", "st_ino", "st_mode", "st_mtime", "st_size", "st_uid"]
+ return {key: getattr(st, key) for key in keys}
+
+
+
+
+#############################################################################
+# Scaffolding for generated test program.
+
+# import logging
+import re
+
+
+# Store context between steps.
+class Context:
+ def __init__(self):
+ self._vars = {}
+ self._ns = {}
+
+ def as_dict(self):
+ return dict(self._vars)
+
+ def get(self, key, default=None):
+ return self._vars.get(key, default)
+
+ def __getitem__(self, key):
+ return self._vars[key]
+
+ def __setitem__(self, key, value):
+ # logging.debug("Context: key {!r} set to {!r}".format(key, value))
+ self._vars[key] = value
+
+ def keys(self):
+ return self._vars.keys()
+
+ def __contains__(self, key):
+ return key in self._vars
+
+ def __delitem__(self, key):
+ del self._vars[key]
+
+ def __repr__(self):
+ return repr({"vars": self._vars, "namespaces": self._ns})
+
+ def declare(self, name):
+ if name not in self._ns:
+ self._ns[name] = NameSpace(name)
+ return self._ns[name]
+
+ def remember_value(self, name, value):
+ ns = self.declare("_values")
+ if name in ns:
+ raise KeyError(name)
+ ns[name] = value
+
+ def recall_value(self, name):
+ ns = self.declare("_values")
+ if name not in ns:
+ raise KeyError(name)
+ return ns[name]
+
+ def expand_values(self, pattern):
+ parts = []
+ while pattern:
+ m = re.search(r"(?<!\$)\$\{(?P<name>\S*)\}", pattern)
+ if not m:
+ parts.append(pattern)
+ break
+ name = m.group("name")
+ if not name:
+ raise KeyError("empty name in expansion")
+ value = self.recall_value(name)
+ parts.append(value)
+ pattern = pattern[m.end() :]
+ return "".join(parts)
+
+
+class NameSpace:
+ def __init__(self, name):
+ self.name = name
+ self._dict = {}
+
+ def as_dict(self):
+ return dict(self._dict)
+
+ def get(self, key, default=None):
+ if key not in self._dict:
+ if default is None:
+ return None
+ self._dict[key] = default
+ return self._dict[key]
+
+ def __setitem__(self, key, value):
+ self._dict[key] = value
+
+ def __getitem__(self, key):
+ return self._dict[key]
+
+ def keys(self):
+ return self._dict.keys()
+
+ def __contains__(self, key):
+ return key in self._dict
+
+ def __delitem__(self, key):
+ del self._dict[key]
+
+ def __repr__(self):
+ return repr(self._dict)
+
+# Decode a base64 encoded string. Result is binary or unicode string.
+
+
+import base64
+
+
+def decode_bytes(s):
+ return base64.b64decode(s)
+
+
+def decode_str(s):
+ return base64.b64decode(s).decode()
+
+# Retrieve an embedded test data file using filename.
+
+
+class Files:
+ def __init__(self):
+ self._files = {}
+
+ def set(self, filename, content):
+ self._files[filename] = content
+
+ def get(self, filename):
+ return self._files[filename]
+
+
+_files = Files()
+
+
+def store_file(filename, content):
+ _files.set(filename, content)
+
+
+def get_file(filename):
+ return _files.get(filename)
+
+# Check two values for equality and give error if they are not equal
+def assert_eq(a, b):
+ assert a == b, "expected %r == %r" % (a, b)
+
+
+# Check two values for inequality and give error if they are equal
+def assert_ne(a, b):
+ assert a != b, "expected %r != %r" % (a, b)
+
+
+# Check that two dict values are equal.
+def _assert_dict_eq(a, b):
+ for key in a:
+ assert key in b, f"exected {key} in both dicts"
+ av = a[key]
+ bv = b[key]
+ assert_eq(type(av), type(bv))
+ if isinstance(av, list):
+ _assert_list_eq(av, bv)
+ elif isinstance(av, dict):
+ _assert_dict_eq(av, bv)
+ else:
+ assert_eq(av, bv)
+ for key in b:
+ assert key in a, f"exected {key} in both dicts"
+
+
+# Check that two list values are equal
+def _assert_list_eq(a, b):
+ assert_eq(len(a), len(b))
+ for (av, bv) in zip(a, b):
+ assert_eq(type(av), type(bv))
+ if isinstance(av, list):
+ _assert_list_eq(av, bv)
+ elif isinstance(av, dict):
+ _assert_dict_eq(av, bv)
+ else:
+ assert_eq(av, bv)
+
+
+# Recursively check two dictionaries are equal
+def assert_dict_eq(a, b):
+ assert isinstance(a, dict)
+ assert isinstance(b, dict)
+ _assert_dict_eq(a, b)
+
+import logging
+import os
+import tempfile
+
+
+#############################################################################
+# Code to implement the scenarios.
+
+
+class Step:
+ def __init__(self):
+ self._kind = None
+ self._text = None
+ self._args = {}
+ self._function = None
+ self._cleanup = None
+
+ def set_kind(self, kind):
+ self._kind = kind
+
+ def set_text(self, text):
+ self._text = text
+
+ def set_arg(self, name, value):
+ self._args[name] = value
+
+ def set_function(self, function):
+ self._function = function
+
+ def set_cleanup(self, cleanup):
+ self._cleanup = cleanup
+
+ def do(self, ctx):
+ print(" step: {} {}".format(self._kind, self._text))
+ logging.info("step: {} {}".format(self._kind, self._text))
+ self._function(ctx, **self._args)
+
+ def cleanup(self, ctx):
+ if self._cleanup:
+ print(" cleanup: {} {}".format(self._kind, self._text))
+ logging.info("cleanup: {} {}".format(self._kind, self._text))
+ self._cleanup(ctx, **self._args)
+
+
+class Scenario:
+ def __init__(self, ctx):
+ self._title = None
+ self._steps = []
+ self._ctx = ctx
+ self._logged_env = False
+
+ def get_title(self):
+ return self._title
+
+ def set_title(self, title):
+ self._title = title
+
+ def append_step(self, step):
+ self._steps.append(step)
+
+ def run(self, datadir, extra_env):
+ print("scenario: {}".format(self._title))
+ logging.info("Scenario: {}".format(self._title))
+
+ scendir = tempfile.mkdtemp(dir=datadir)
+ os.chdir(scendir)
+ self._set_environment_variables_to(scendir, extra_env)
+
+ done = []
+ ctx = self._ctx
+ try:
+ for step in self._steps:
+ step.do(ctx)
+ done.append(step)
+ except Exception as e:
+ logging.error(str(e), exc_info=True)
+ for step in reversed(done):
+ step.cleanup(ctx)
+ raise
+ for step in reversed(done):
+ step.cleanup(ctx)
+
+ def _set_environment_variables_to(self, scendir, extra_env):
+ log_value = globals()["log_value"]
+
+ overrides = {
+ "SHELL": "/bin/sh",
+ "HOME": scendir,
+ "TMPDIR": scendir,
+ }
+
+ os.environ.update(overrides)
+ os.environ.update(extra_env)
+ if not self._logged_env:
+ self._logged_env = True
+ log_value("extra_env", 0, dict(extra_env))
+ log_value("os.environ", 0, dict(os.environ))
+
+import argparse
+import logging
+import os
+import random
+import shutil
+import sys
+import tarfile
+import tempfile
+
+
+class MultilineFormatter(logging.Formatter):
+ def format(self, record):
+ s = super().format(record)
+ lines = list(s.splitlines())
+ return lines.pop(0) + "\n".join(" %s" % line for line in lines)
+
+
+def indent(n):
+ return " " * n
+
+
+def log_value(msg, level, v):
+ if is_multiline_string(v):
+ logging.debug(f"{indent(level)}{msg}:")
+ log_lines(indent(level + 1), v)
+ elif isinstance(v, dict) and v:
+ # Only non-empty dictionaries
+ logging.debug(f"{indent(level)}{msg}:")
+ for k in sorted(v.keys()):
+ log_value(f"{k!r}", level + 1, v[k])
+ elif isinstance(v, list) and v:
+ # Only non-empty lists
+ logging.debug(f"{indent(level)}{msg}:")
+ for i, x in enumerate(v):
+ log_value(f"{i}", level + 1, x)
+ else:
+ logging.debug(f"{indent(level)}{msg}: {v!r}")
+
+
+def is_multiline_string(v):
+ if isinstance(v, str) and "\n" in v:
+ return True
+ elif isinstance(v, bytes) and b"\n" in v:
+ return True
+ else:
+ return False
+
+
+def log_lines(prefix, v):
+ if isinstance(v, str):
+ nl = "\n"
+ else:
+ nl = b"\n"
+ if nl in v:
+ for line in v.splitlines(keepends=True):
+ logging.debug(f"{prefix}{line!r}")
+ else:
+ logging.debug(f"{prefix}{v!r}")
+
+
+# Remember where we started from. The step functions may need to refer
+# to files there.
+srcdir = os.getcwd()
+print("srcdir", srcdir)
+
+# Create a new temporary directory and chdir there. This allows step
+# functions to create new files in the current working directory
+# without having to be so careful.
+_datadir = tempfile.mkdtemp()
+print("datadir", _datadir)
+os.chdir(_datadir)
+
+
+def parse_command_line():
+ p = argparse.ArgumentParser()
+ p.add_argument("--log")
+ p.add_argument("--env", action="append", default=[])
+ p.add_argument("--run-all", "-k", action="store_true")
+ p.add_argument("--save-on-failure")
+ p.add_argument("patterns", nargs="*")
+ return p.parse_args()
+
+
+def setup_logging(args):
+ if args.log:
+ fmt = "%(asctime)s %(levelname)s %(message)s"
+ datefmt = "%Y-%m-%d %H:%M:%S"
+ formatter = MultilineFormatter(fmt, datefmt)
+
+ filename = os.path.abspath(os.path.join(srcdir, args.log))
+ handler = logging.FileHandler(filename)
+ handler.setFormatter(formatter)
+ else:
+ handler = logging.NullHandler()
+
+ logger = logging.getLogger()
+ logger.addHandler(handler)
+ logger.setLevel(logging.DEBUG)
+
+
+def save_directory(dirname, tarname):
+ print("tarname", tarname)
+ logging.info("Saving {} to {}".format(dirname, tarname))
+ tar = tarfile.open(tarname, "w")
+ tar.add(dirname, arcname="datadir")
+ tar.close()
+
+
+def main(scenarios):
+ args = parse_command_line()
+ setup_logging(args)
+ logging.info("Test program starts")
+
+ logging.info("patterns: {}".format(args.patterns))
+ if len(args.patterns) == 0:
+ logging.info("Executing all scenarios")
+ todo = list(scenarios)
+ random.shuffle(todo)
+ else:
+ logging.info("Executing requested scenarios only: {}".format(args.patterns))
+ patterns = [arg.lower() for arg in args.patterns]
+ todo = [
+ scen
+ for scen in scenarios
+ if any(pattern in scen.get_title().lower() for pattern in patterns)
+ ]
+
+ extra_env = {}
+ for env in args.env:
+ (name, value) = env.split("=", 1)
+ extra_env[name] = value
+
+ errors = []
+ for scen in todo:
+ try:
+ scen.run(_datadir, extra_env)
+ except Exception as e:
+ logging.error(str(e), exc_info=True)
+ errors.append((scen, e))
+ if args.save_on_failure:
+ print(args.save_on_failure)
+ filename = os.path.abspath(os.path.join(srcdir, args.save_on_failure))
+ print(filename)
+ save_directory(_datadir, filename)
+ if not args.run_all:
+ raise
+
+ shutil.rmtree(_datadir)
+
+ if errors:
+ sys.stderr.write(f"ERROR: {len(errors)} scenarios failed\n")
+ for (scean, e) in errors:
+ sys.stderr.write(f" - Scenario {scen.get_title()} failed:\n {e}\n")
+ if args.log:
+ sys.stderr.write(f"Log file in {args.log}\n")
+ sys.exit(1)
+
+ print("OK, all scenarios finished successfully")
+ logging.info("OK, all scenarios finished successfully")
+
+
+
+#############################################################################
+# Test data files that were embedded in the source document. Base64
+# encoding is used to allow arbitrary data.
+
+
+# well-known.pgp
+filename = decode_str('d2VsbC1rbm93bi5wZ3A=')
+contents = decode_bytes('Y2VydGlmaWNhdGUK')
+store_file(filename, contents)
+
+
+
+
+#############################################################################
+# Classes for individual scenarios.
+
+
+#----------------------------------------------------------------------------
+# Scenario: We can look up an existing certificate
+class Scenario_1():
+ def __init__(self):
+ ctx = Context()
+ self._scenario = Scenario(ctx)
+ self._scenario.set_title(decode_str('V2UgY2FuIGxvb2sgdXAgYW4gZXhpc3RpbmcgY2VydGlmaWNhdGU='))
+
+ # Step: a running Hagrid system
+ step = Step()
+ step.set_kind('given')
+ step.set_text(decode_str('YSBydW5uaW5nIEhhZ3JpZCBzeXN0ZW0='))
+ step.set_function(nop)
+ if '':
+ step.set_cleanup()
+ self._scenario.append_step(step)
+
+ # Step: file well-known.pgp
+ step = Step()
+ step.set_kind('given')
+ step.set_text(decode_str('ZmlsZSB3ZWxsLWtub3duLnBncA=='))
+ step.set_function(files_create_from_embedded)
+ if '':
+ step.set_cleanup()
+ self._scenario.append_step(step)
+ name = decode_str('ZW1iZWRkZWRfZmlsZQ==')
+ text = decode_str('d2VsbC1rbm93bi5wZ3A=')
+ step.set_arg(name, text)
+
+ # Step: I make request GET /vks/v1/by-fingerprint/EXAMPLEFP
+ step = Step()
+ step.set_kind('when')
+ step.set_text(decode_str('SSBtYWtlIHJlcXVlc3QgR0VUIC92a3MvdjEvYnktZmluZ2VycHJpbnQvRVhBTVBMRUZQ'))
+ step.set_function(get)
+ if '':
+ step.set_cleanup()
+ self._scenario.append_step(step)
+ name = decode_str('cGF0aA==')
+ text = decode_str('L3Zrcy92MS9ieS1maW5nZXJwcmludC9FWEFNUExFRlA=')
+ step.set_arg(name, text)
+
+ # Step: response HTTP status code is 200
+ step = Step()
+ step.set_kind('then')
+ step.set_text(decode_str('cmVzcG9uc2UgSFRUUCBzdGF0dXMgY29kZSBpcyAyMDA='))
+ step.set_function(status_code)
+ if '':
+ step.set_cleanup()
+ self._scenario.append_step(step)
+ name = decode_str('Y29kZQ==')
+ text = decode_str('MjAw')
+ step.set_arg(name, text)
+
+ # Step: response has header content-type: application/octet-stream
+ step = Step()
+ step.set_kind('then')
+ step.set_text(decode_str('cmVzcG9uc2UgaGFzIGhlYWRlciBjb250ZW50LXR5cGU6IGFwcGxpY2F0aW9uL29jdGV0LXN0cmVhbQ=='))
+ step.set_function(has_header)
+ if '':
+ step.set_cleanup()
+ self._scenario.append_step(step)
+ name = decode_str('aGVhZGVy')
+ text = decode_str('Y29udGVudC10eXBl')
+ step.set_arg(name, text)
+ name = decode_str('dmFsdWU=')
+ text = decode_str('YXBwbGljYXRpb24vb2N0ZXQtc3RyZWFt')
+ step.set_arg(name, text)
+
+ # Step: response body matches file well-known.pgp
+ step = Step()
+ step.set_kind('then')
+ step.set_text(decode_str('cmVzcG9uc2UgYm9keSBtYXRjaGVzIGZpbGUgd2VsbC1rbm93bi5wZ3A='))
+ step.set_function(body_matches_file)
+ if '':
+ step.set_cleanup()
+ self._scenario.append_step(step)
+ name = decode_str('ZmlsZW5hbWU=')
+ text = decode_str('d2VsbC1rbm93bi5wZ3A=')
+ step.set_arg(name, text)
+
+
+ def get_title(self):
+ return self._scenario.get_title()
+
+ def run(self, datadir, extra_env):
+ self._scenario.run(datadir, extra_env)
+
+#----------------------------------------------------------------------------
+# Scenario: We get an error if using lower case hexadecimal
+class Scenario_2():
+ def __init__(self):
+ ctx = Context()
+ self._scenario = Scenario(ctx)
+ self._scenario.set_title(decode_str('V2UgZ2V0IGFuIGVycm9yIGlmIHVzaW5nIGxvd2VyIGNhc2UgaGV4YWRlY2ltYWw='))
+
+ # Step: a running Hagrid system
+ step = Step()
+ step.set_kind('given')
+ step.set_text(decode_str('YSBydW5uaW5nIEhhZ3JpZCBzeXN0ZW0='))
+ step.set_function(nop)
+ if '':
+ step.set_cleanup()
+ self._scenario.append_step(step)
+
+ # Step: I make request GET /vks/v1/by-fingerprint/EXAMPLEFP
+ step = Step()
+ step.set_kind('when')
+ step.set_text(decode_str('SSBtYWtlIHJlcXVlc3QgR0VUIC92a3MvdjEvYnktZmluZ2VycHJpbnQvRVhBTVBMRUZQ'))
+ step.set_function(get)
+ if '':
+ step.set_cleanup()
+ self._scenario.append_step(step)
+ name = decode_str('cGF0aA==')
+ text = decode_str('L3Zrcy92MS9ieS1maW5nZXJwcmludC9FWEFNUExFRlA=')
+ step.set_arg(name, text)
+
+ # Step: response HTTP status code is 400
+ step = Step()
+ step.set_kind('then')
+ step.set_text(decode_str('cmVzcG9uc2UgSFRUUCBzdGF0dXMgY29kZSBpcyA0MDA='))
+ step.set_function(status_code)
+ if '':
+ step.set_cleanup()
+ self._scenario.append_step(step)
+ name = decode_str('Y29kZQ==')
+ text = decode_str('NDAw')
+ step.set_arg(name, text)
+
+
+ def get_title(self):
+ return self._scenario.get_title()
+
+ def run(self, datadir, extra_env):
+ self._scenario.run(datadir, extra_env)
+
+#----------------------------------------------------------------------------
+# Scenario: We get an error for a malformed fingerprint
+class Scenario_3():
+ def __init__(self):
+ ctx = Context()
+ self._scenario = Scenario(ctx)
+ self._scenario.set_title(decode_str('V2UgZ2V0IGFuIGVycm9yIGZvciBhIG1hbGZvcm1lZCBmaW5nZXJwcmludA=='))
+
+ # Step: a running Hagrid system
+ step = Step()
+ step.set_kind('given')
+ step.set_text(decode_str('YSBydW5uaW5nIEhhZ3JpZCBzeXN0ZW0='))
+ step.set_function(nop)
+ if '':
+ step.set_cleanup()
+ self._scenario.append_step(step)
+
+ # Step: I make request GET /vks/v1/by-fingerprint/IMPOSSIBLE
+ step = Step()
+ step.set_kind('when')
+ step.set_text(decode_str('SSBtYWtlIHJlcXVlc3QgR0VUIC92a3MvdjEvYnktZmluZ2VycHJpbnQvSU1QT1NTSUJMRQ=='))
+ step.set_function(get)
+ if '':
+ step.set_cleanup()
+ self._scenario.append_step(step)
+ name = decode_str('cGF0aA==')
+ text = decode_str('L3Zrcy92MS9ieS1maW5nZXJwcmludC9JTVBPU1NJQkxF')
+ step.set_arg(name, text)
+
+ # Step: response HTTP status code is 400
+ step = Step()
+ step.set_kind('then')
+ step.set_text(decode_str('cmVzcG9uc2UgSFRUUCBzdGF0dXMgY29kZSBpcyA0MDA='))
+ step.set_function(status_code)
+ if '':
+ step.set_cleanup()
+ self._scenario.append_step(step)
+ name = decode_str('Y29kZQ==')
+ text = decode_str('NDAw')
+ step.set_arg(name, text)
+
+
+ def get_title(self):
+ return self._scenario.get_title()
+
+ def run(self, datadir, extra_env):
+ self._scenario.run(datadir, extra_env)
+
+#----------------------------------------------------------------------------
+# Scenario: We get an error for a missing fingerprint
+class Scenario_4():
+ def __init__(self):
+ ctx = Context()
+ self._scenario = Scenario(ctx)
+ self._scenario.set_title(decode_str('V2UgZ2V0IGFuIGVycm9yIGZvciBhIG1pc3NpbmcgZmluZ2VycHJpbnQ='))
+
+ # Step: a running Hagrid system
+ step = Step()
+ step.set_kind('given')
+ step.set_text(decode_str('YSBydW5uaW5nIEhhZ3JpZCBzeXN0ZW0='))
+ step.set_function(nop)
+ if '':
+ step.set_cleanup()
+ self._scenario.append_step(step)
+
+ # Step: I make request GET /vks/v1/by-fingerprint/010203
+ step = Step()
+ step.set_kind('when')
+ step.set_text(decode_str('SSBtYWtlIHJlcXVlc3QgR0VUIC92a3MvdjEvYnktZmluZ2VycHJpbnQvMDEwMjAz'))
+ step.set_function(get)
+ if '':
+ step.set_cleanup()
+ self._scenario.append_step(step)
+ name = decode_str('cGF0aA==')
+ text = decode_str('L3Zrcy92MS9ieS1maW5nZXJwcmludC8wMTAyMDM=')
+ step.set_arg(name, text)
+
+ # Step: response HTTP status code is 400
+ step = Step()
+ step.set_kind('then')
+ step.set_text(decode_str('cmVzcG9uc2UgSFRUUCBzdGF0dXMgY29kZSBpcyA0MDA='))
+ step.set_function(status_code)
+ if '':
+ step.set_cleanup()
+ self._scenario.append_step(step)
+ name = decode_str('Y29kZQ==')
+ text = decode_str('NDAw')
+ step.set_arg(name, text)
+
+
+ def get_title(self):
+ return self._scenario.get_title()
+
+ def run(self, datadir, extra_env):
+ self._scenario.run(datadir, extra_env)
+
+
+_scenarios = {
+ Scenario_1(),
+ Scenario_2(),
+ Scenario_3(),
+ Scenario_4(),
+}
+
+
+#############################################################################
+# Call main function and clean up.
+main(_scenarios)