#!/usr/bin/env python3 import json import logging import os import shlex import sys import time import subprocess from subprocess import PIPE, STDOUT import cliapp import yaml # The device in the manager VM for the workspace disk. WS_DEV = '/dev/vdb' # The worker VM image file on manager VM. WORKER_IMG = 'worker.img' # The temporary worker VM image file, used while worker VM is running. TEMP_IMG = 'temp.img' # The UID of the worker account, on the worker VM. WORKER_UID = 1000 # The GID of the worker account, on the worker VM. WORKER_GID = 1000 class ContractorApplication(cliapp.Application): def add_settings(self): self.settings.string( ['manager-address', 'm'], 'address of the manager VM', metavar='ADDR') self.settings.string( ['manager-user'], 'user of the manager in the manager VM', default='manager', metavar='USERNAME') self.settings.boolean( ['verbose', 'v'], 'be verbose', default=False) def cmd_dump(self, args): bs = self.load_build_spec(args[0]) self.output.write('{}\n'.format(json.dumps(bs.as_dict(), indent=4))) def cmd_build(self, args): timer = Timer(self.verbose) overall = Timer(self.verbose) dest = self.manager_destination() manager = RemoteServer(dest, verbose=self.verbose) self.verbose('building using spec at {}'.format(args[0])) bs = self.load_build_spec(args[0]) self.upload_worker_image(bs.worker_image(), dest) timer.report('upload-worker-image') # Do the minimum needed to start worker VM. The VM takes a # while to boot and we can do other things while that happens. execs = [ GetCPUCount(), ] er = self.exec_sequence(manager, *execs) timer.report('setup') # Find number of CPUs. cpus = 1 for line in er.stdout.decode('UTF8').splitlines(): if line.startswith('CPU(s):'): cpus = int(line.split()[-1]) execs = [ DestroyWorkerVM(), UndefineWorkerVM(), CopyWorkerImage(), StartGuestNetworking(), CreateWorkerVM(cpus), TryUnmountWS(), MountWS(), ChownWS(), Mkdir('/mnt/src', owner=WORKER_UID, group=WORKER_GID), ] self.exec_sequence(manager, *execs) timer.report('setup') self.verbose('setting up workspace on worker') ws = bs.workspace() if os.path.exists(ws): self.sync_to_workspace(ws, dest, '.') timer.report('upload-saved-workspace') src = bs.source() self.sync_to_workspace(src, dest, 'src') timer.report('upload-source') execs = [ UnmountWS(), WorkerIP(), ] er = self.exec_sequence(manager, *execs) worker_ip = er.stdout.decode('UTF8').strip() timer.report('wait-for-worker') self.exec_sequence(manager, AttachWS()) worker = OnWorker( dest, 'worker@{}'.format(worker_ip), verbose=self.verbose) self.exec_sequence(worker, MountWSonWorker()) execs = [] install = bs.install() if install: execs.append(AptInstall(install)) execs.append(Chdir('/workspace/src')) execs.append(Build(bs.build())) self.exec_sequence(worker, *execs) timer.report('build') execs = [ ShutdownWorkerVM(), MountWS(), ] self.exec_sequence(manager, *execs) timer.report('shutdown-worker') if ws: self.verbose('saving workspace to {}'.format(ws)) self.sync_from_workspace(dest, ws) timer.report('save-workspace') overall.report('complete-run') self.verbose('build finished OK') def load_build_spec(self, filename): with open(filename) as f: return BuildSpec(f.read()) def upload_worker_image(self, filename, dest): self.verbose('uploading image to worker: {}'.format(filename)) target = '{}:{}'.format(dest, WORKER_IMG) if rsync(filename, target).failed(): self.error('could not upload image to worker') sys.exit(1) def sync_to_workspace(self, frm, dest, subdir): er = rsync( '{}/.'.format(frm), '{}:/mnt/{}/.'.format(dest, subdir)) if er.failed(): self.error('Failed to rsync saved workspace to worker') sys.exit(1) def sync_from_workspace(self, dest, ws): if not os.path.exists(ws): os.makedirs(ws) er = rsync('{}:/mnt/.'.format(dest), '{}/.'.format(ws)) if er.failed(): self.error('Failed to rsync workspace from worker') sys.exit(1) def exec_sequence(self, manager, *execs): er = manager.quietly(*execs) if er.failed(): self.error('Failed to do that, giving up') sys.exit(1) return er def cmd_manager_status(self, args): dest = self.manager_destination() manager = RemoteServer(dest) if manager.quietly(TrueCmd()).failed(): self.error('Manager VM is NOT available') sys.exit(1) self.verbose('Manager VM is available') def manager_destination(self): user = self.settings['manager-user'] addr = self.settings['manager-address'] return '{}@{}'.format(user, addr) def error(self, msg): sys.stderr.write('ERROR: {}\n'.format(msg)) logging.error('ERROR: {}'.format(msg)) def verbose(self, msg): logging.info(msg) if self.settings['verbose']: print(msg) class ExecResult: def __init__(self, stdout, stderr, exit_code): self.stdout = stdout self.stderr = stderr self.exit_code = exit_code logging.debug('RESULT: {} {!r} {!r}'.format(stdout, stderr, exit_code)) def failed(self): return self.exit_code != 0 def ssh(target, argv, quiet=False): argv = ['ssh', '--', target] + [shlex.quote(arg) for arg in argv] logging.info('SSH: {!r}'.format(argv)) stdout = None if quiet: stdout = PIPE logging.debug( 'EXEC: {!r} stdout={!r} stderr={!r}'.format( argv, stdout, STDOUT)) p = subprocess.Popen(argv, stderr=STDOUT, stdout=stdout) out, err = p.communicate() return ExecResult(out, err, p.returncode) def rsync(filename, target): argv = [ 'rsync', '-aHSs', '--delete', '--exclude=lost+found', '--', filename, target, ] logging.info('RSYNC: {!r}'.format(argv)) p = subprocess.Popen(argv, stderr=STDOUT, stdout=None) out, err = p.communicate() return ExecResult(out, err, p.returncode) class RemoteServer: def __init__(self, ssh_destination, verbose=None): self._dest = ssh_destination self._verbose = verbose def quietly(self, *execs): return self._execute(*execs, quiet=True) def verbosely(self, *execs): return self._execute(*execs, quiet=False) def _msg(self, execs): if self._verbose is not None: for e in execs: m = e.msg() or e.__class__.__name__ self._verbose(' ' + m) def _argv(self, execs): self._msg(execs) snippets = [e.as_shell() for e in execs] return ['sh', '-euc', '; '.join(snippets)] def _execute(self, *execs, quiet=None): assert quiet is not None return ssh(self._dest, self._argv(execs), quiet=quiet) class OnWorker(RemoteServer): def __init__(self, manager, worker, verbose=None): self._dest = manager self._prefix = ['ssh', worker, '--'] self._verbose = verbose def _execute(self, *execs, quiet=None): assert quiet is not None argv = [shlex.quote(a) for a in self._argv(execs)] return ssh(self._dest, self._prefix + argv, quiet=quiet) class RemoteExecution: def msg(self): return None def argv(self): raise NotImplementedError() def as_shell(self): return ' '.join(shlex.quote(arg) for arg in self.argv()) class MayFail(RemoteExecution): def as_shell(self): return super().as_shell() + '|| true' class TrueCmd(RemoteExecution): def msg(self): return 'can we run true?' def argv(self): return ['true'] class AptInstall(RemoteExecution): def __init__(self, packages): self._packages = packages def msg(self): return 'installing packages: ' + ' '.join(self._packages) def argv(self): return ['sudo', 'DEBIAN_FRONTEND=noninteractive', 'apt-get', '-y', 'install'] + self._packages class Chdir(RemoteExecution): def __init__(self, pathname): self._pathname = pathname def msg(self): return 'cd {}'.format(self._pathname) def argv(self): return ['cd', self._pathname] class Build(RemoteExecution): def __init__(self, shell): self._shell = shell def msg(self): return 'building' def argv(self): return ['sh', '-euxc', self._shell] def virsh(*args): return ['virsh', '-c', 'qemu:///system'] + list(args) class DestroyWorkerVM(MayFail): def msg(self): return 'destroying worker VM' def argv(self): return virsh('destroy', 'worker') class UndefineWorkerVM(MayFail): def msg(self): return 'undefining worker VM' def argv(self): return virsh('undefine', 'worker') class ShutdownWorkerVM(RemoteExecution): def msg(self): return 'shutting down worker VM cleanly' def argv(self): return ['sh', '-c', ''' virsh -c qemu:///system shutdown worker while virsh -c qemu:///system list --name | grep do sleep 0 done '''] class CopyWorkerImage(RemoteExecution): def msg(self): return 'copying image file for new worker VM' def argv(self): return ['sh', '-c', 'rm -f temp.img; cp worker.img temp.img'] self.ssh(['rm', '-f', TEMP_IMG]) ret = self.ssh(['cp', WORKER_IMG, TEMP_IMG]) class StartGuestNetworking(MayFail): def msg(self): return 'starting guest network' def argv(self): return virsh('net-start', 'default') class MountWS(RemoteExecution): def msg(self): return 'mounting workspace on manager' def argv(self): return ['sudo', 'mount', WS_DEV, '/mnt'] class MountWSonWorker(RemoteExecution): def msg(self): return 'mounting workspace on worker' def argv(self): return ['sudo', 'mount', '/dev/vdb', '/workspace'] class TryUnmountWS(MayFail): def msg(self): return 'trying to unmount workspace on manager' def argv(self): return ['sudo', 'umount', '--quiet', WS_DEV] class UnmountWS(RemoteExecution): def msg(self): return 'unmounting workspace on manager' def argv(self): return ['sudo', 'umount', '--quiet', WS_DEV] class ChownWS(RemoteExecution): def msg(self): return 'set ownerships on workspace' def argv(self): return ['sudo', 'chown', '-R', '{}:{}'.format(WORKER_UID, WORKER_GID), '/mnt'] class Mkdir(RemoteExecution): def __init__(self, pathname, owner='root', group='root', mode=0o755): self._argv = [ 'install', '-d', '-o', str(owner), '-g', str(group), '-m', '{:o}'.format(mode), pathname, ] self._pathname = pathname def msg(self): return 'create {}'.format(self._pathname) def argv(self): return self._argv class GetCPUCount(RemoteExecution): def msg(self): return 'get CPU count' def argv(self): return ['lscpu'] class CreateWorkerVM(RemoteExecution): def __init__(self, manager_cpus): self._worker_cpus = max(manager_cpus - 1, 1) def msg(self): return 'creating worker VM' def argv(self): return [ 'virt-install', '--connect', 'qemu:///system', '--quiet', '--name=worker', '--memory=4096', '--vcpus={}'.format(self._worker_cpus), '--cpu=host', '--import', '--os-variant=debian9', '--disk=path={},cache=none'.format(TEMP_IMG), '--network=network=default', '--graphics=spice', '--noautoconsole', ] class AttachWS(RemoteExecution): def msg(self): return 'attach workspace disk to worker' def argv(self): return virsh('attach-disk', 'worker', WS_DEV, 'vdb', '--targetbus', 'virtio', '--live') class WorkerIP(RemoteExecution): def msg(self): return 'get worker IP' def argv(self): return ['sh', '-euc', ''' status=/var/lib/libvirt/dnsmasq/virbr0.status while true do ip="$(jq -r '.[-1]["ip-address"]' "$status")" ssh-keygen -R "$ip" 2> /dev/null > /dev/null if ssh "worker@$ip" true 2> /dev/null then break fi done echo "$ip" '''] class BuildSpec: def __init__(self, yaml_text): spec = yaml.safe_load(yaml_text) self._image = os.path.expanduser(self._get(spec, 'worker-image')) self._source = os.path.expanduser(self._get(spec, 'source')) self._workspace = os.path.expanduser(self._get(spec, 'workspace', '')) self._install = self._get(spec, 'install', []) self._build = self._get(spec, 'build') def worker_image(self): return self._image def install(self): return self._install def source(self): return self._source def workspace(self): return self._workspace def build(self): return self._build def as_dict(self): return { 'worker-image': self.worker_image(), 'install': self.install(), 'source': self.source(), 'workspace': self.workspace(), 'build': self.build(), } def _get(self, spec, key, default=None): v = spec.get(key, default) if v is None: raise SpecMissingKey(key) return v class SpecMissingKey(Exception): def __init__(self, key): super().__init__( 'Build specification is missing required key {!r}'.format(key)) class Timer: def __init__(self, report): self._report = report self._prev = time.time() def report(self, msg): now = time.time() duration = now - self._prev self._prev = now self._report('time: {:.1f} s {}'.format(duration, msg)) ContractorApplication().run()