#!/usr/bin/env python3 import argparse import json import logging import os import shlex import sys import time import subprocess from subprocess import PIPE, STDOUT import yaml # The device in the manager VM for the workspace disk. WS_DEV = '/dev/vdb' # The worker VM image file on manager VM. WORKER_IMG = 'worker.img' # The temporary worker VM image file, used while worker VM is running. TEMP_IMG = 'temp.img' # The UID of the worker account, on the worker VM. WORKER_UID = 1000 # The GID of the worker account, on the worker VM. WORKER_GID = 1000 class ExecResult: def __init__(self, stdout, stderr, exit_code): self.stdout = stdout self.stderr = stderr self.exit_code = exit_code logging.debug('RESULT: {} {!r} {!r}'.format(stdout, stderr, exit_code)) def failed(self): return self.exit_code != 0 def ssh(target, port, argv, quiet=False): ssh_argv = ['ssh', '-p', str(port), '--', target] argv = ssh_argv + [shlex.quote(arg) for arg in argv] logging.info('SSH: {!r}'.format(argv)) stdout = None if quiet: stdout = PIPE logging.debug( 'EXEC: {!r} stdout={!r} stderr={!r}'.format( argv, stdout, STDOUT)) p = subprocess.Popen(argv, stderr=STDOUT, stdout=stdout) out, err = p.communicate() return ExecResult(out, err, p.returncode) def rsync(filename, target, port): argv = [ 'rsync', '-essh -p{}'.format(port), '-aHSs', '--delete', '--exclude=lost+found', '--', filename, target, ] logging.info('RSYNC: {!r}'.format(argv)) p = subprocess.Popen(argv, stderr=STDOUT, stdout=None) out, err = p.communicate() return ExecResult(out, err, p.returncode) class RemoteServer: def __init__(self, ssh_destination, ssh_port, verbose=None): self._dest = ssh_destination self._port = ssh_port self._where = 'manager: {}'.format(ssh_destination) self._verbose = verbose def quietly(self, *execs): return self._execute(*execs, quiet=True) def verbosely(self, *execs): return self._execute(*execs, quiet=False) def _msg(self, execs): if self._verbose is not None: self._verbose('Executing steps on {}'.format(self._where)) for e in execs: m = e.msg() or e.__class__.__name__ self._verbose(' - ' + m) def _argv(self, execs): self._msg(execs) snippets = [e.as_shell() for e in execs] return ['sh', '-euc', '; '.join(snippets)] def _execute(self, *execs, quiet=None): assert quiet is not None return ssh(self._dest, self._port, self._argv(execs), quiet=quiet) class OnWorker(RemoteServer): def __init__(self, manager, ssh_port, worker, verbose=None): self._dest = manager self._port = ssh_port self._where = 'worker: {}'.format(worker) self._prefix = ['ssh', worker, '--'] self._verbose = verbose def _execute(self, *execs, quiet=None): assert quiet is not None argv = [shlex.quote(a) for a in self._argv(execs)] return ssh(self._dest, self._port, self._prefix + argv, quiet=quiet) class RemoteExecution: def msg(self): return None def argv(self): raise NotImplementedError() def as_shell(self): return ' '.join(shlex.quote(arg) for arg in self.argv()) class MayFail(RemoteExecution): def as_shell(self): return super().as_shell() + '|| true' class TrueCmd(RemoteExecution): def msg(self): return 'can we run true?' def argv(self): return ['true'] class Ansible(RemoteExecution): def __init__(self, playbook, worker_ip): self._playbook = playbook self._worker_ip = worker_ip def msg(self): return 'running Ansible playbook on worker' def argv(self): x = ['sh', '-c', ''' cat > hosts << EOF [all] worker ansible_ssh_host={ip} EOF cat > worker.yaml << EOF {playbook} EOF ansible-playbook -i hosts worker.yaml '''.format(ip=self._worker_ip, playbook=yaml.safe_dump(self._playbook))] logging.debug('Ansible: {!r}'.format(x)) return x class Chdir(RemoteExecution): def __init__(self, pathname): self._pathname = pathname def msg(self): return 'cd {}'.format(self._pathname) def argv(self): return ['cd', self._pathname] class Build(RemoteExecution): def __init__(self, shell): self._shell = shell def msg(self): return 'building' def argv(self): return ['sh', '-euxc', self._shell] def virsh(*args): return ['virsh', '-c', 'qemu:///system'] + list(args) class DestroyWorkerVM(MayFail): def msg(self): return 'destroying worker VM' def argv(self): return virsh('destroy', 'worker') class UndefineWorkerVM(MayFail): def msg(self): return 'undefining worker VM' def argv(self): return virsh('undefine', 'worker') class ShutdownWorkerVM(RemoteExecution): def msg(self): return 'shutting down worker VM cleanly' def argv(self): return ['sh', '-c', ''' virsh -c qemu:///system shutdown worker while virsh -c qemu:///system list --name | grep do sleep 0 done '''] class CopyWorkerImage(RemoteExecution): def msg(self): return 'copying image file for new worker VM' def argv(self): return ['sh', '-c', 'rm -f temp.img; cp worker.img temp.img'] self.ssh(['rm', '-f', TEMP_IMG]) ret = self.ssh(['cp', WORKER_IMG, TEMP_IMG]) class StartGuestNetworking(MayFail): def msg(self): return 'starting guest network' def argv(self): return virsh('net-start', 'default') class MountWS(RemoteExecution): def msg(self): return 'mounting workspace on manager' def argv(self): return ['sudo', 'mount', WS_DEV, '/mnt'] class MountWSonWorker(RemoteExecution): def msg(self): return 'mounting workspace on worker' def argv(self): return ['sudo', 'mount', '/dev/vdb', '/workspace'] class TryUnmountWS(MayFail): def msg(self): return 'trying to unmount workspace on manager' def argv(self): return ['sudo', 'umount', '--quiet', WS_DEV] class UnmountWS(RemoteExecution): def msg(self): return 'unmounting workspace on manager' def argv(self): return ['sudo', 'umount', '--quiet', WS_DEV] class ChownWS(RemoteExecution): def msg(self): return 'set ownerships on workspace' def argv(self): return ['sudo', 'chown', '-R', '{}:{}'.format(WORKER_UID, WORKER_GID), '/mnt'] class Mkdir(RemoteExecution): def __init__(self, pathname, owner='root', group='root', mode=0o755): self._argv = [ 'sudo', 'install', '-d', '-o', str(owner), '-g', str(group), '-m', '{:o}'.format(mode), pathname, ] self._pathname = pathname def msg(self): return 'create {}'.format(self._pathname) def argv(self): return self._argv class CreateWorkerVM(RemoteExecution): def msg(self): return 'creating worker VM' def argv(self): return ['sh', '-euxc', ''' n="$(grep -c '^processor' /proc/cpuinfo)" n="$(expr "$n" - 1)" virt-install \ --connect=qemu:///system \ --quiet \ --name=worker \ --memory=4096 \ --vcpus="$n" \ --cpu=host \ --import \ --os-variant=debian9 \ --disk=path={img} \ --network=network=default \ --graphics=spice \ --noautoconsole \ '''.format(img=TEMP_IMG)] class AttachWS(RemoteExecution): def msg(self): return 'attach workspace disk to worker' def argv(self): return virsh('--quiet', 'attach-disk', 'worker', WS_DEV, 'vdb', '--targetbus', 'virtio', '--live') class WorkerIP(RemoteExecution): def msg(self): return 'get worker IP' def argv(self): return ['sh', '-euc', ''' status=/var/lib/libvirt/dnsmasq/virbr0.status while true do ip="$(jq -r '.[-1]["ip-address"]' "$status")" ssh-keygen -R "$ip" 2> /dev/null > /dev/null || true if ssh "worker@$ip" true 2> /dev/null then break fi done echo "$ip" '''] class Find(RemoteExecution): def __init__(self, dirname): self._dirname = dirname def argv(self): return ['sudo', 'find', self._dirname, '-ls'] class BuildSpec: def __init__(self, yaml_text): spec = yaml.safe_load(yaml_text) self._image = os.path.expanduser(self._get(spec, 'worker-image')) self._source = os.path.expanduser(self._get(spec, 'source')) self._workspace = os.path.expanduser(self._get(spec, 'workspace', '')) self._ansible = self._get(spec, 'ansible', []) self._build = self._get(spec, 'build') def worker_image(self): return self._image def ansible(self): return self._ansible def source(self): return self._source def workspace(self): return self._workspace def build(self): return self._build def as_dict(self): return { 'worker-image': self.worker_image(), 'ansible': self.ansible(), 'source': self.source(), 'workspace': self.workspace(), 'build': self.build(), } def _get(self, spec, key, default=None): v = spec.get(key, default) if v is None: raise SpecMissingKey(key) return v class SpecMissingKey(Exception): def __init__(self, key): super().__init__( 'Build specification is missing required key {!r}'.format(key)) class Timer: def __init__(self, report, title): self._report = report self._title = title self._prev = None def __enter__(self): self._prev = time.time() def __exit__(self, exctype, exc, tb): now = time.time() duration = now - self._prev self._prev = now self._report('time: {:.1f} s {}\n'.format(duration, self._title)) return False def load_build_spec(filename): with open(filename) as f: return BuildSpec(f.read()) def error(msg): sys.stderr.write('ERROR: {}\n'.format(msg)) logging.error('ERROR: {}'.format(msg)) def verbose(args, msg): logging.info(msg) if args.verbose: print(msg) def manager_destination(args): user = args.manager_user addr = args.manager_address port = args.manager_port return '{}@{}'.format(user, addr), port def upload_worker_image(vrb, filename, dest, port): vrb('uploading to manager local worker image {}'.format(filename)) target = '{}:{}'.format(dest, WORKER_IMG) if rsync(filename, target, port).failed(): error('could not upload image to worker') sys.exit(1) def sync_to_workspace(vrb, frm, dest, port, subdir): destdir = '/mnt/{}'.format(subdir) vrb('syncing local {} to manager {}'.format(frm, destdir)) er = rsync('{}/.'.format(frm), '{}:{}/.'.format(dest, destdir), port) if er.failed(): error('Failed to rsync saved workspace to worker') sys.exit(1) def sync_from_workspace(vrb, dest, port, ws): vrb('syncing manager /mnt to local {}'.format(ws)) if not os.path.exists(ws): os.makedirs(ws) er = rsync('{}:/mnt/.'.format(dest), '{}/.'.format(ws), port) if er.failed(): error('Failed to rsync workspace from worker') sys.exit(1) def exec_sequence(how, *execs): er = how(*execs) if er.failed(): error('Failed to do that, giving up') sys.exit(1) return er def exec_quietly(manager, *execs): return exec_sequence(manager.quietly, *execs) def exec_verbosely(manager, *execs): return exec_sequence(manager.verbosely, *execs) def cmd_dump(args): bs = load_build_spec(args.spec) sys.stdout.write('{}\n'.format(json.dumps(bs.as_dict(), indent=4))) def cmd_status(args): dest, port = manager_destination(args) verbose(args, 'manager VM is {}:{}'.format(dest, port)) manager = RemoteServer(dest, port) if manager.quietly(TrueCmd()).failed(): error('Manager VM is NOT available') sys.exit(1) verbose(args, 'manager VM is available') def cmd_build(args): vrb = lambda msg: verbose(args, msg) vrb('building according to {}'.format(args.spec)) bs = load_build_spec(args.spec) dest, port = manager_destination(args) vrb('manager is at {} (port {})'.format(dest, port)) manager = RemoteServer(dest, port, verbose=vrb) with Timer(vrb, 'complete-run'): with Timer(vrb, 'upload-worker-image'): upload_worker_image(vrb, bs.worker_image(), dest, port) # Do the minimum needed to start worker VM. The VM takes a # while to boot and we can do other things while that # happens. with Timer(vrb, 'start-worker'): execs = [ DestroyWorkerVM(), UndefineWorkerVM(), CopyWorkerImage(), StartGuestNetworking(), CreateWorkerVM(), TryUnmountWS(), MountWS(), ChownWS(), ] exec_quietly(manager, *execs) with Timer(vrb, 'upload-saved-workspace'): ws = bs.workspace() if os.path.exists(ws): sync_to_workspace(vrb, ws, dest, port, '.') with Timer(vrb, 'upload-source'): exec_quietly(manager, Mkdir('/mnt/src', owner=WORKER_UID, group=WORKER_GID)) src = bs.source() sync_to_workspace(vrb, src, dest, port, 'src') with Timer(vrb, 'wait-for-worker-to-be-available'): execs = [ UnmountWS(), WorkerIP(), AttachWS(), ] er = exec_quietly(manager, *execs) worker_ip = er.stdout.decode('UTF8').strip() with Timer(vrb, 'prepare-workspace-worker'): worker_dest = 'worker@{}'.format(worker_ip) vrb('worker is at {} (via manager)'.format(worker_dest)) worker = OnWorker(dest, port, worker_dest, verbose=vrb) exec_quietly(worker, Mkdir('/workspace'), MountWSonWorker()) with Timer(vrb, 'prepare-worker-with-ansible'): ansible = bs.ansible() if ansible: exec_verbosely(manager, Ansible(ansible, worker_ip)) with Timer(vrb, 'build'): execs = [ Chdir('/workspace/src'), Build(bs.build()), ] build_failed = worker.verbosely(*execs).failed() with Timer(vrb, 'shutdown-worker'): execs = [ ShutdownWorkerVM(), MountWS(), ] exec_quietly(manager, *execs) with Timer(vrb, 'save-workspace'): if ws: vrb('saving workspace to {}'.format(ws)) sync_from_workspace(vrb, dest, port, ws) if build_failed: error('build FAILED') sys.exit(1) vrb('build finished OK') def setup_logging(args): if args.log: fmt = '%(asctime)s %(levelname)s %(message)s' datefmt = '%Y-%m-%d %H:%M:%S' formatter = logging.Formatter(fmt, datefmt) handler = logging.FileHandler(args.log) handler.setFormatter(formatter) else: handler = logging.NullHandler() logger = logging.getLogger() logger.addHandler(handler) logger.setLevel(logging.INFO) def main(): p = argparse.ArgumentParser() p.add_argument('-v', '--verbose', action='store_true') p.add_argument('--log', help='log to a file') sub = p.add_subparsers() manager_defaults = { 'manager_address': None, 'manager_port': 22, 'manager_user': 'manager', } dump = sub.add_parser('dump', help='dump parsed build spec as JSON') dump.add_argument('spec') dump.set_defaults(func=cmd_dump) status = sub.add_parser('status', help='check status of manager VM') status.add_argument('-m', '--manager-address', help='address of manager VM') status.add_argument('-p', '--manager-port', help='SSH port of manager VM') status.add_argument('-u', '--manager-usr', help='user on manager VM') status.set_defaults(func=cmd_status, **manager_defaults) build = sub.add_parser('build', help='build according to spec') build.add_argument('spec') build.add_argument('-m', '--manager-address', help='address of manager VM') build.add_argument('-p', '--manager-port', help='SSH port of manager VM') build.set_defaults(func=cmd_build, **manager_defaults) args = p.parse_args() setup_logging(args) args.func(args) if __name__ == '__main__': main()