############################################################################# # Some helpers to make step functions simpler. import json import logging import os import re import subprocess import time import yaml # Run a subprocess, capture its output and exit code in context. def _run(ctx, argv, env): p = subprocess.Popen(argv, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env) stdout, stderr = p.communicate("") ctx["argv"] = argv ctx["env"] = env ctx["stdout"] = stdout.decode("utf-8") ctx["stderr"] = stderr.decode("utf-8") ctx["exit"] = p.returncode # Return argv prefix to run contractor from source directory. This # ignores any config from the user running the test program, but gets # the address of the manager VM from CONTRACTOR_ADDRESS in the # environment. def _contractor(): return [os.path.join(srcdir, "contractor")] # Return manager address. def _manager_address(): filename = os.path.join(srcdir, "test.address") addr = open(filename).read().strip() logging.debug("Manager address from %s is %r", filename, addr) return addr # Create a config file for Contractor. Return its filename. def _create_config(ctx): filename = "config.yaml" if ctx.get("config") is None: ctx["config"] = {"manager_address": _manager_address(), "verbose": True} with open(filename, "w") as f: yaml.safe_dump(ctx["config"], stream=f) logging.debug("config file: %r", open(filename).read()) return filename # Create dict with environment variables for running Contractor, merging # os.environ and srcdir/test.env. def _create_env(): env = dict(os.environ) for line in open(os.path.join(srcdir, "test.env")): name, value = line.split("=", 1) env[name] = value.strip() return env ############################################################################# # The actual step functions. # Do nothing. Useful to have bindings that are recognized, but don't # actually do anything. def nop(ctx, **kwargs): pass # Check that we can access the contractor VM. # FIXME: this hardcodes some things. def contractor_is_working(ctx): config = _create_config(ctx) argv = _contractor() + ["-c", config, "status"] _run(ctx, argv, _create_env()) assert_eq(ctx["exit"], 0) # Create a file from the files embedded in the input document. def create_file(ctx, filename=None): with open(filename, "wb") as f: f.write(get_file(filename)) # Check that a file exists. def file_exists(ctx, filename=None): assert_eq(os.path.exists(filename), True) # Copy a file from srcdir. def copy_file_from_srcdir(ctx, filename=None): shutil.copy(os.path.join(srcdir, "./" + filename), ".") # Check that the subprocess we last ran ended with the expected exit # code. def exit_code_is(ctx, exit_code=None): assert_eq(ctx["exit"], int(exit_code)) # Run contractor dump def run_contractor_dump(ctx, filename=None): argv = _contractor() + ["dump", filename] _run(ctx, argv, _create_env()) # Run the contractor to do a build. def run_contractor_build(ctx, filename=None): config = _create_config(ctx) argv = _contractor() + ["-c", config, "build", filename] _run(ctx, argv, _create_env()) # Parse stdout from latest subprocess as JSON into a dict. Read the # named YAML file, parse as a dict. Do the two dicts match? def stdout_json_matches_yaml_file(ctx, filename=None): dict1 = json.loads(ctx["stdout"]) with open(filename) as f: dict2 = yaml.safe_load(f) assert_eq(dict1, dict2)