#!/usr/bin/env python3 import argparse import json import logging import os import re import shlex import sys import time import subprocess from subprocess import PIPE, STDOUT import yaml # Default configuration files. DEFAULT_CONFIGS = {os.path.expanduser("~/.config/contractor/config.yaml")} # The disk image file on the manager VM for the workspace disk. WS_IMG = "/home/manager/workspace.img" WS_SIZE = "20G" WS_MNT = "/mnt" # The device on the worker for the workspace disk. WORKER_WS_DEV = "vdb" # The worker VM image file on manager VM. WORKER_IMG = "worker.img" # The temporary worker VM image file, used while worker VM is running. TEMP_IMG = "temp.img" class ExecResult: def __init__(self, stdout, stderr, exit_code): self.stdout = stdout self.stderr = stderr self.exit_code = exit_code logging.debug("RESULT: {} {!r} {!r}".format(stdout, stderr, exit_code)) def failed(self): return self.exit_code != 0 def ssh(target, port, argv, quiet=False): ssh_argv = ["ssh", "-p", str(port), "--", target] argv = ssh_argv + [shlex.quote(arg) for arg in argv] logging.info("SSH: {!r}".format(argv)) stdout = None if quiet: stdout = PIPE logging.debug("EXEC: {!r} stdout={!r} stderr={!r}".format(argv, stdout, STDOUT)) p = subprocess.Popen(argv, stderr=STDOUT, stdout=stdout) out, err = p.communicate() logging.debug("EXIT: {}".format(p.returncode)) logging.debug("OUTPUT:\n{}".format(out)) logging.debug("STDERR:\n{}".format(err)) return ExecResult(out, err, p.returncode) def rsync(filename, target, port): argv = [ "rsync", "-essh -p{}".format(port), "-aHSs", "--delete", "--exclude=lost+found", "--", filename, target, ] logging.info("RSYNC: {!r}".format(argv)) p = subprocess.Popen(argv, stderr=STDOUT, stdout=None) out, err = p.communicate() return ExecResult(out, err, p.returncode) class RemoteServer: def __init__(self, ssh_destination, ssh_port, verbose=None): self._dest = ssh_destination self._port = ssh_port self._where = "manager: {}".format(ssh_destination) self._verbose = verbose def quietly(self, *execs): return self._execute(*execs, quiet=True) def verbosely(self, *execs): return self._execute(*execs, quiet=False) def _msg(self, execs): if self._verbose is not None: self._verbose("Executing steps on {}".format(self._where)) for e in execs: m = e.msg() or e.__class__.__name__ self._verbose(" - " + m) def _argv(self, execs): self._msg(execs) snippets = [e.as_shell() for e in execs] return ["sh", "-euc", "; ".join(snippets)] def _execute(self, *execs, quiet=None): assert quiet is not None return ssh(self._dest, self._port, self._argv(execs), quiet=quiet) class OnWorker(RemoteServer): def __init__(self, manager, ssh_port, worker, verbose=None): self._dest = manager self._port = ssh_port self._where = "worker: {}".format(worker) self._prefix = ["ssh", worker, "--"] self._verbose = verbose def _execute(self, *execs, quiet=None): assert quiet is not None argv = [shlex.quote(a) for a in self._argv(execs)] return ssh(self._dest, self._port, self._prefix + argv, quiet=quiet) class RemoteExecution: def msg(self): return None def argv(self): raise NotImplementedError() def as_shell(self): return " ".join(shlex.quote(arg) for arg in self.argv()) class MayFail(RemoteExecution): def as_shell(self): return super().as_shell() + "|| true" class TrueCmd(RemoteExecution): def msg(self): return "can we run true?" def argv(self): return ["true"] class Ansible(RemoteExecution): def __init__(self, playbook, worker_ip): self._playbook = playbook self._worker_ip = worker_ip def msg(self): return "running Ansible playbook on worker" def argv(self): x = [ "sh", "-c", """ cat > hosts << EOF [all] worker ansible_ssh_host={ip} EOF cat > worker.yaml << EOF {playbook} EOF ansible-playbook -i hosts worker.yaml """.format( ip=self._worker_ip, playbook=yaml.safe_dump(self._playbook) ), ] logging.debug("Ansible: {!r}".format(x)) return x class Chdir(RemoteExecution): def __init__(self, pathname): self._pathname = pathname def msg(self): return "cd {}".format(self._pathname) def argv(self): return ["cd", self._pathname] class Build(RemoteExecution): def __init__(self, shell): self._shell = shell def msg(self): return "building" def argv(self): return ["sh", "-euxc", self._shell] def virsh(*args): return ["virsh", "-c", "qemu:///system"] + list(args) class DestroyWorkerVM(MayFail): def msg(self): return "destroying worker VM" def argv(self): return virsh("destroy", "worker") class UndefineWorkerVM(MayFail): def msg(self): return "undefining worker VM" def argv(self): return virsh("undefine", "worker") class ShutdownWorkerVM(RemoteExecution): def msg(self): return "shutting down worker VM cleanly" def argv(self): return [ "sh", "-c", """ virsh -c qemu:///system shutdown worker while virsh -c qemu:///system list --name | grep worker do sleep 0 done """, ] class CopyWorkerImage(RemoteExecution): def msg(self): return "copying image file for new worker VM" def argv(self): return ["sh", "-c", "rm -f temp.img; cp worker.img temp.img"] self.ssh(["rm", "-f", TEMP_IMG]) self.ssh(["cp", WORKER_IMG, TEMP_IMG]) class StartGuestNetworking(MayFail): def msg(self): return "starting guest network" def argv(self): return virsh("net-start", "default") class GetUID(RemoteExecution): def msg(self): return "get UID on manager" def argv(self): return ["id", "-u"] class GetGID(RemoteExecution): def msg(self): return "get GID of on manager" def argv(self): return ["id", "-g"] class RemoveWS(RemoteExecution): def msg(self): return "remove workspace image on manager" def argv(self): return ["rm", "-f", WS_IMG] class CreateWS(RemoteExecution): def msg(self): return "creating workspace on manager" def argv(self): return ["qemu-img", "create", "-q", "-f", "raw", WS_IMG, WS_SIZE] class MkfsWS(RemoteExecution): def msg(self): return "mkfs workspace on manager" def argv(self): return ["sudo", "mkfs", "-t", "ext4", "-q", WS_IMG] class MountWS(RemoteExecution): def msg(self): return "mounting workspace on manager" def argv(self): return ["sudo", "mount", "-oloop", WS_IMG, WS_MNT] class MountWSonWorker(RemoteExecution): def msg(self): return "mounting workspace on worker" def argv(self): return ["sudo", "mount", "/dev/{}".format(WORKER_WS_DEV), "/workspace"] class TryUnmountWS(MayFail): def msg(self): return "trying to unmount workspace on manager" def argv(self): return ["sudo", "umount", "--quiet", WS_IMG] class UnmountWS(RemoteExecution): def msg(self): return "unmounting workspace on manager" def argv(self): return ["sudo", "umount", "--quiet", WS_IMG] class ChownWS(RemoteExecution): def __init__(self, uid, gid): self.uid = uid self.gid = gid def msg(self): return "set ownerships on workspace" def argv(self): return ["sudo", "chown", "-R", "{}:{}".format(self.uid, self.gid), WS_MNT] class Mkdir(RemoteExecution): def __init__(self, pathname, owner="root", group="root", mode=0o755): self._argv = [ "sudo", "install", "-d", "-o", str(owner), "-g", str(group), "-m", "{:o}".format(mode), pathname, ] self._pathname = pathname def msg(self): return "create {}".format(self._pathname) def argv(self): return self._argv class CreateWorkerVM(RemoteExecution): def msg(self): return "creating worker VM" def argv(self): return [ "sh", "-euxc", """ n="$(grep -c '^processor' /proc/cpuinfo)" n="$(expr "$n" - 1)" mem="$(awk '/MemTotal/ {{ print $2 }}' /proc/meminfo)" smaller="$(expr "$mem" - 2097152)" if [ "$smaller" -lt 0 ] then smaller="$mem" fi mem="$(expr "$smaller" / 1024)" virt-install \ --connect=qemu:///system \ --quiet \ --name=worker \ --memory="$mem" \ --vcpus="$n" \ --cpu=host \ --import \ --os-variant=debian9 \ --disk=path={img} \ --network=network=default \ --graphics=spice \ --noautoconsole \ """.format( img=TEMP_IMG ), ] class AttachWS(RemoteExecution): def msg(self): return "attach workspace disk to worker" def argv(self): return virsh( "--quiet", "attach-disk", "worker", WS_IMG, WORKER_WS_DEV, "--targetbus", "virtio", "--live", ) class WorkerIP(RemoteExecution): def msg(self): return "get worker IP" def argv(self): return [ "sh", "-euc", """ status=/var/lib/libvirt/dnsmasq/virbr0.status while true do ip="$(jq -r '.[-1]["ip-address"]' "$status")" ssh-keygen -R "$ip" 2> /dev/null > /dev/null || true if ssh "worker@$ip" true 2> /dev/null then break fi done echo "worker-ip:: $ip ::" """, ] def parse_worker_ip(output): output = output.decode("UTF8") m = re.search(r"worker-ip:: (?P\d+\.\d+\.\d+\.\d+) ::$", output) if m: return m.group("ip") return None class Find(RemoteExecution): def __init__(self, dirname): self._dirname = dirname def argv(self): return ["sudo", "find", self._dirname, "-ls"] class BuildSpec: def __init__(self, yaml_text): spec = yaml.safe_load(yaml_text) self._image = os.path.expanduser(self._get(spec, "worker-image")) self._source = os.path.expanduser(self._get(spec, "source")) self._workspace = os.path.expanduser(self._get(spec, "workspace", "")) self._ansible = self._get(spec, "ansible", []) self._build = self._get(spec, "build") def worker_image(self): return self._image def ansible(self): return self._ansible def source(self): return self._source def workspace(self): return self._workspace def build(self): return self._build def as_dict(self): return { "worker-image": self.worker_image(), "ansible": self.ansible(), "source": self.source(), "workspace": self.workspace(), "build": self.build(), } def _get(self, spec, key, default=None): v = spec.get(key, default) if v is None: raise SpecMissingKey(key) return v class SpecMissingKey(Exception): def __init__(self, key): super().__init__("Build specification is missing required key {!r}".format(key)) class Timer: def __init__(self, report, title): self._report = report self._title = title self._prev = None def __enter__(self): self._prev = time.time() def __exit__(self, exctype, exc, tb): now = time.time() duration = now - self._prev self._prev = now self._report("time: {:.1f} s {}\n".format(duration, self._title)) return False def load_build_spec(filename): with open(filename) as f: return BuildSpec(f.read()) def error(msg): sys.stderr.write("ERROR: {}\n".format(msg)) logging.error("ERROR: {}".format(msg)) def verbose(args, msg): logging.info(msg) if args.verbose: print(msg) def manager_destination(args): user = args.manager_user addr = args.manager_address port = args.manager_port return "{}@{}".format(user, addr), port def upload_worker_image(vrb, filename, dest, port): vrb("uploading to manager local worker image {}".format(filename)) target = "{}:{}".format(dest, WORKER_IMG) if rsync(filename, target, port).failed(): error("could not upload image to worker") sys.exit(1) def sync_to_workspace(vrb, frm, dest, port, subdir): destdir = "{}/{}".format(WS_MNT, subdir) vrb("syncing local {} to manager {}".format(frm, destdir)) er = rsync(f"{frm}/.", f"{dest}:{destdir}/.", port) if er.failed(): error("Failed to rsync saved workspace to worker") sys.exit(1) def sync_from_workspace(vrb, dest, port, ws): vrb("syncing manager {!r} to local {!r} (port {!r})".format(WS_MNT, ws, port)) if not os.path.exists(ws): os.makedirs(ws) er = rsync(f"{dest}:{WS_MNT}/.", f"{ws}/.", port) if er.failed(): error("Failed to rsync workspace from worker") sys.exit(1) def exec_sequence(how, *execs): er = how(*execs) if er.failed(): error("Failed to do that, giving up") sys.exit(1) return er def exec_quietly(manager, *execs): return exec_sequence(manager.quietly, *execs) def exec_verbosely(manager, *execs): return exec_sequence(manager.verbosely, *execs) def cmd_dump(args): bs = load_build_spec(args.spec) sys.stdout.write("{}\n".format(json.dumps(bs.as_dict(), indent=4))) def cmd_provision(args): ssh_opts = [ "ControlMaster=auto", "ControlPersist=60s", "StrictHostKeyChecking=accept-new", "UserKnownHostsFile=/dev/null", ] env = dict(os.environ) env["ANSIBLE_SSH_ARGS"] = " ".join(f"-o{opt}" for opt in ssh_opts) argv = [ "ansible-playbook", "-i", "hosts", "manager.yml", f"-eansible_ssh_host={args.manager_address}", f"-eansible_ssh_port={args.manager_port}", ] subprocess.check_call(argv, env=env) def cmd_status(args): dest, port = manager_destination(args) verbose(args, "manager VM is {}:{}".format(dest, port)) manager = RemoteServer(dest, port) if manager.quietly(TrueCmd()).failed(): error("Manager VM is NOT available") sys.exit(1) verbose(args, "manager VM is available") def cmd_build(args): def vrb(msg): verbose(args, msg) vrb("building according to {}".format(args.spec)) bs = load_build_spec(args.spec) dest, port = manager_destination(args) vrb("manager is at {} (port {})".format(dest, port)) manager = RemoteServer(dest, port, verbose=vrb) with Timer(vrb, "complete-run"): with Timer(vrb, "upload-worker-image"): upload_worker_image(vrb, bs.worker_image(), dest, port) # Do the minimum needed to start worker VM. The VM takes a # while to boot and we can do other things while that # happens. with Timer(vrb, "start-worker"): execs = [ DestroyWorkerVM(), UndefineWorkerVM(), CopyWorkerImage(), StartGuestNetworking(), CreateWorkerVM(), ] exec_quietly(manager, *execs) with Timer(vrb, "start-worker"): execs = [GetUID()] er = exec_quietly(manager, *execs) manager_uid = int(er.stdout) with Timer(vrb, "start-worker"): execs = [GetGID()] er = exec_quietly(manager, *execs) manager_gid = int(er.stdout) with Timer(vrb, "start-worker"): execs = [ TryUnmountWS(), RemoveWS(), CreateWS(), MkfsWS(), MountWS(), ChownWS(manager_uid, manager_gid), ] exec_quietly(manager, *execs) with Timer(vrb, "upload-saved-workspace"): ws = bs.workspace() if ws: if not os.path.exists(ws): os.makedirs(ws) sync_to_workspace(vrb, ws, dest, port, ".") with Timer(vrb, "upload-source"): exec_quietly( manager, Mkdir("{}/src".format(WS_MNT), owner=manager_uid, group=manager_gid), ) src = bs.source() sync_to_workspace(vrb, src, dest, port, "src") with Timer(vrb, "wait-for-worker-to-be-available"): execs = [UnmountWS(), WorkerIP(), AttachWS()] er = exec_quietly(manager, *execs) worker_ip = parse_worker_ip(er.stdout) with Timer(vrb, "prepare-workspace-worker"): worker_dest = "worker@{}".format(worker_ip) vrb("worker is at {} (via manager)".format(worker_dest)) worker = OnWorker(dest, port, worker_dest, verbose=vrb) exec_quietly(worker, Mkdir("/workspace"), MountWSonWorker()) with Timer(vrb, "prepare-worker-with-ansible"): ansible = bs.ansible() if ansible: exec_verbosely(manager, Ansible(ansible, worker_ip)) with Timer(vrb, "build"): execs = [Chdir("/workspace/src"), Build(bs.build())] build_failed = worker.verbosely(*execs).failed() with Timer(vrb, "shutdown-worker"): execs = [ShutdownWorkerVM(), MountWS(), ChownWS(manager_uid, manager_gid)] exec_quietly(manager, *execs) with Timer(vrb, "save-workspace"): if ws: vrb("saving workspace to {}".format(ws)) sync_from_workspace(vrb, dest, port, ws) if build_failed: error("build FAILED") sys.exit(1) vrb("build finished OK") def setup_logging(args): if args.log: fmt = "%(asctime)s %(levelname)s %(message)s" datefmt = "%Y-%m-%d %H:%M:%S" formatter = logging.Formatter(fmt, datefmt) handler = logging.FileHandler(args.log) handler.setFormatter(formatter) else: handler = logging.NullHandler() logger = logging.getLogger() logger.addHandler(handler) logger.setLevel(logging.DEBUG) def load_default_config(args): for filename in DEFAULT_CONFIGS: if os.path.exists(filename): load_config(filename, args) def load_config(filename, args): def identity(x): return x with open(filename) as f: config = yaml.safe_load(f) keys = { "manager_address": None, "manager_port": None, "manager_user": None, "verbose": None, "log": os.path.expanduser, } for key in keys: if key in config: func = keys[key] if func is None: func = identity setattr(args, key, func(config[key])) def main(): p = argparse.ArgumentParser() p.add_argument("-C", "--use-default-config", action="store_true") p.add_argument("-c", "--config") p.add_argument("-v", "--verbose", action="store_true") p.add_argument("--log", help="log to a file") sub = p.add_subparsers() manager_defaults = { "manager_address": None, "manager_port": 22, "manager_user": "manager", } dump = sub.add_parser("dump", help="dump parsed build spec as JSON") dump.add_argument("spec") dump.set_defaults(func=cmd_dump) provision = sub.add_parser("provision", help="provision manager VM") provision.set_defaults(func=cmd_provision, **manager_defaults) status = sub.add_parser("status", help="check status of manager VM") status.add_argument("-m", "--manager-address", help="address of manager VM") status.add_argument("-p", "--manager-port", help="SSH port of manager VM") status.add_argument("-u", "--manager-usr", help="user on manager VM") status.set_defaults(func=cmd_status, **manager_defaults) build = sub.add_parser("build", help="build according to spec") build.add_argument("spec") build.add_argument("-m", "--manager-address", help="address of manager VM") build.add_argument("-p", "--manager-port", help="SSH port of manager VM") build.set_defaults(func=cmd_build, **manager_defaults) args = p.parse_args() cli_v = args.verbose if args.use_default_config: load_default_config(args) if args.config: load_config(args.config, args) if cli_v: args.verbose = True setup_logging(args) args.func(args) if __name__ == "__main__": main()