#!/usr/bin/env python3 import json import logging import os import shlex import sys import time import subprocess from subprocess import PIPE, STDOUT import cliapp import yaml # The device in the manager VM for the workspace disk. WS_DEV = '/dev/vdb' # The worker VM image file on manager VM. WORKER_IMG = 'worker.img' # The temporary worker VM image file, used while worker VM is running. TEMP_IMG = 'temp.img' # The UID of the worker account, on the worker VM. WORKER_UID = 1000 # The GID of the worker account, on the worker VM. WORKER_GID = 1000 class ContractorApplication(cliapp.Application): def add_settings(self): self.settings.string( ['manager-address', 'm'], 'address of the manager VM', metavar='ADDR') self.settings.string( ['manager-user'], 'user of the manager in the manager VM', default='manager', metavar='USERNAME') self.settings.boolean( ['verbose', 'v'], 'be verbose', default=False) def cmd_dump(self, args): bs = self.load_build_spec(args[0]) self.output.write('{}\n'.format(json.dumps(bs.as_dict(), indent=4))) def cmd_build(self, args): timer = Timer(self.verbose) overall = Timer(self.verbose) self.verbose('building using spec at {}'.format(args[0])) bs = self.load_build_spec(args[0]) m = self.manager(timer) timer.report('load-build-spec') self.verbose( 'uploading image to worker: {}'.format(bs.worker_image())) if m.upload_worker_image(bs.worker_image()).failed(): self.error('could not upload image to worker') sys.exit(1) # This might fail. We ignore the failure. self.verbose('stopping worker') m.kill_worker() self.verbose('setting up workspace on worker') ws = bs.workspace() source = bs.source() if m.setup_workspace(ws, source).failed(): self.error('could not set up workspace') sys.exit(1) self.verbose('starting worker') w = m.start_worker() if not w: self.error('could not start worker') sys.exit(1) install = bs.install() if install: self.verbose( 'installing packages: {}'.format(', '.join(install))) if w.install_packages(install).failed(): self.error('failed to install packages') sys.exit(1) else: self.verbose('no packages to install') self.verbose('building') if w.build(bs.build()).failed(): self.error('build failed') sys.exit(1) timer.report('build') # This might fail. We ignore the failure. self.verbose('stopping worker') m.stop_worker() if ws: self.verbose('saving workspace to {}'.format(ws)) m.save_workspace(ws) overall.report('complete-run') self.verbose('build finished OK') def load_build_spec(self, filename): with open(filename) as f: return BuildSpec(f.read()) def cmd_manager_status(self, args): timer = Timer(self.verbose) m = self.manager(timer) if m.ssh(['true']).failed(): self.error('Manager VM is NOT available') sys.exit(1) self.verbose('Manager VM is available') def manager(self, timer): user = self.settings['manager-user'] addr = self.settings['manager-address'] return Manager(user, addr, timer) def error(self, msg): sys.stderr.write('ERROR: {}\n'.format(msg)) logging.error('ERROR: {}'.format(msg)) def verbose(self, msg): logging.info(msg) if self.settings['verbose']: print(msg) class ExecResult: def __init__(self, stdout, stderr, exit_code): self.stdout = stdout self.stderr = stderr self.exit_code = exit_code logging.debug('RESULT: {} {!r} {!r}'.format(stdout, stderr, exit_code)) def failed(self): return self.exit_code != 0 def ssh(target, argv, quiet=False): argv = ['ssh', '--', target] + [shlex.quote(arg) for arg in argv] logging.info('SSH: {!r}'.format(argv)) stdout = None if quiet: stdout = PIPE logging.debug( 'EXEC: {!r} stdout={!r} stderr={!r}'.format( argv, stdout, STDOUT)) p = subprocess.Popen(argv, stderr=STDOUT, stdout=stdout) out, err = p.communicate() return ExecResult(out, err, p.returncode) def rsync(filename, target): argv = [ 'rsync', '-aHSs', '--delete', '--exclude=lost+found', '--', filename, target, ] logging.info('RSYNC: {!r}'.format(argv)) p = subprocess.Popen(argv, stderr=STDOUT, stdout=PIPE) out, err = p.communicate() return ExecResult(out, err, p.returncode) class BuildSpec: def __init__(self, yaml_text): spec = yaml.safe_load(yaml_text) self._image = os.path.expanduser(self._get(spec, 'worker-image')) self._source = os.path.expanduser(self._get(spec, 'source')) self._workspace = os.path.expanduser(self._get(spec, 'workspace', '')) self._install = self._get(spec, 'install', []) self._build = self._get(spec, 'build') def worker_image(self): return self._image def install(self): return self._install def source(self): return self._source def workspace(self): return self._workspace def build(self): return self._build def as_dict(self): return { 'worker-image': self.worker_image(), 'install': self.install(), 'source': self.source(), 'workspace': self.workspace(), 'build': self.build(), } def _get(self, spec, key, default=None): v = spec.get(key, default) if v is None: raise SpecMissingKey(key) return v class SpecMissingKey(Exception): def __init__(self, key): super().__init__( 'Build specification is missing required key {!r}'.format(key)) class Manager: def __init__(self, user, addr, timer): self._target = '{}@{}'.format(user, addr) self._timer = timer def ssh(self, argv, quiet=False): return ssh(self._target, argv, quiet=quiet) def virsh(self, argv): ret = ssh( self._target, ['virsh', '-c', 'qemu:///system'] + argv, quiet=True) self._timer.report('virsh {}'.format(' '.join(argv))) return ret def kill_worker(self): self.virsh(['destroy', 'worker']) self.virsh(['undefine', 'worker']) def stop_worker(self): self.virsh(['shutdown', 'worker']) while True: er = self.virsh(['list', '--name']) if er.failed(): break if er.stdout.strip() == b'': break time.sleep(2) def upload_worker_image(self, filename): ret = rsync(filename, '{}:{}'.format(self._target, WORKER_IMG)) self._timer.report('upload-worker-image') return ret def start_worker(self): if self.copy_worker_image().failed(): logging.error('Could not copy worker image for new instance') return None # We ignore failures on these, as they shouldn't matter. self.virsh(['net-autostart', 'default']) self.virsh(['net-start', 'default']) n = self.get_cpu_count() if not isinstance(n, int): logging.error('Could not start worker due to missing CPU count') return None self._timer.report('get-cpu-count') er = self.ssh([ 'virt-install', '--connect', 'qemu:///system', '--quiet', '--name=worker', '--memory=4096', '--vcpus={}'.format(n-1), '--cpu=host', '--import', '--os-variant=debian9', '--disk=path={},cache=none'.format(TEMP_IMG), '--disk=path={},cache=none'.format(WS_DEV), '--network=network=default', '--graphics=spice', '--noautoconsole', ]) if er.failed(): logging.error('Could not create worker VM') return None self._timer.report('virt-install') return self.wait_for_worker() def copy_worker_image(self): self.ssh(['rm', '-f', TEMP_IMG]) ret = self.ssh(['cp', WORKER_IMG, TEMP_IMG]) self._timer.report('copy-worker-image') return ret def get_cpu_count(self): er = self.ssh(['lscpu'], quiet=True) if er.failed(): logging.error('lscpu on manager failed: {!r}'.format(err)) return None for line in er.stdout.decode('UTF8').splitlines(): if line.startswith('CPU(s):'): return int(line.split()[-1]) logging.error('Could not find number of CPUs on manager') return None def worker_ip(self): filename = '/var/lib/libvirt/dnsmasq/virbr0.status' er = self.ssh(['cat', filename], quiet=True) if er.failed(): logging.error('Could not read dnsmasq status file') return None if not er.stdout: return None status = json.loads(er.stdout) if not status: return None status.sort(key=lambda e: e['expiry-time']) self._timer.report('worker-ip') return status[-1]['ip-address'] def wait_for_worker(self): # We look up the IP and try to use it. The IP might be for a # previous worker instance. while True: ip = self.worker_ip() if ip is None: continue w = Worker(self, ip) self.ssh(['ssh-keygen', '-R', ip]) if not w.ssh(['true'], quiet=True).failed(): self._timer.report('wait-for-worker') return w time.sleep(2) def setup_workspace(self, saved_workspace, source): er = self.setup_helper(saved_workspace, source) if er.failed(): logging.error('Could not set up workspace') self.umount(WS_DEV) return er def setup_helper(self, saved_workspace, source): er = self.mount(WS_DEV) if er.failed(): return er self._timer.report('mount-workspace') er = self.ssh( ['sudo', 'chown', '-R', '{}:{}'.format(WORKER_UID, WORKER_GID), '/mnt']) if er.failed(): return er self._timer.report('chown-workspace') er = self.ssh( ['sudo', 'install', '-d', '--owner={}'.format(WORKER_UID), '--group={}'.format(WORKER_GID), '/mnt/src']) if er.failed(): return er self._timer.report('create-workspace-src') if os.path.exists(saved_workspace): er = rsync( '{}/.'.format(saved_workspace), '{}:/mnt/.'.format(self._target)) if er.failed(): return er self._timer.report('upload-saved-workspace') er = rsync( '{}/.'.format(source), '{}:/mnt/src/.'.format(self._target)) if er.failed(): return er self._timer.report('upload-source') return er def save_workspace(self, saved_workspace): er = self.save_helper(saved_workspace) if er.failed(): logging.error('Could not save workspace') self.umount(WS_DEV) return er def save_helper(self, saved_workspace): er = self.mount(WS_DEV) if er.failed(): return er self._timer.report('mount-workspace') if not os.path.exists(saved_workspace): os.makedirs(saved_workspace) ret = rsync( '{}:/mnt/.'.format(self._target), '{}/.'.format(saved_workspace)) self._timer.report('save-workspace') return ret def kpartx(self, options): er = self.ssh(['sudo', 'kpartx', options, TEMP_IMG]) if er.failed(): logging.error('Could not add partitions for worker image') return er def mount(self, device): er = self.ssh(['sudo', 'mount', device, '/mnt']) if er.failed(): logging.error('Could not mount device') return er def umount(self, device): er = self.ssh(['sudo', 'umount', device]) if er.failed(): logging.error('Could not mount device') return er class Worker: def __init__(self, manager, ip): self._manager = manager self._target = 'worker@{}'.format(ip) def ssh(self, argv, quiet=False): argv = [shlex.quote(arg) for arg in argv] argv = ['ssh', self._target, '--'] + argv return self._manager.ssh(argv, quiet=quiet) def install_packages(self, pkgs): return self.ssh( ['sudo', 'DEBIAN_FRONTEND=noninteractive', 'apt-get', '-y', 'install'] + pkgs) def build(self, shell_text): shell_text = 'cd /workspace/src\n' + shell_text return self.ssh(['sh', '-euxc', shell_text]) class Timer: def __init__(self, report): self._report = report self._prev = time.time() def report(self, msg): now = time.time() duration = now - self._prev self._prev = now self._report('time: {:.1f} s {}'.format(duration, msg)) ContractorApplication().run()