From 6c18d9fca73165b75d61f9ce873ead23d5ad425d Mon Sep 17 00:00:00 2001 From: Lars Wirzenius Date: Thu, 16 Apr 2020 09:17:18 +0300 Subject: Add: abstract running commands over SSH; reduce SSH calls This speeds up a no-op test from about 50 seconds to 30 seconds on my laptop. --- contractor | 666 ++++++++++++++++++++++++++++++++++++------------------------- funcs.py | 1 - worker.yml | 8 +- 3 files changed, 398 insertions(+), 277 deletions(-) diff --git a/contractor b/contractor index 5389a0a..e3475f3 100755 --- a/contractor +++ b/contractor @@ -59,58 +59,89 @@ class ContractorApplication(cliapp.Application): def cmd_build(self, args): timer = Timer(self.verbose) overall = Timer(self.verbose) + dest = self.manager_destination() + manager = RemoteServer(dest, verbose=self.verbose) self.verbose('building using spec at {}'.format(args[0])) bs = self.load_build_spec(args[0]) - m = self.manager(timer) - timer.report('load-build-spec') - self.verbose( - 'uploading image to worker: {}'.format(bs.worker_image())) - if m.upload_worker_image(bs.worker_image()).failed(): - self.error('could not upload image to worker') - sys.exit(1) + self.upload_worker_image(bs.worker_image(), dest) + timer.report('upload-worker-image') - # This might fail. We ignore the failure. - self.verbose('stopping worker') - m.kill_worker() + # Do the minimum needed to start worker VM. The VM takes a + # while to boot and we can do other things while that happens. + execs = [ + GetCPUCount(), + ] + er = self.exec_sequence(manager, *execs) + timer.report('setup') + + # Find number of CPUs. + cpus = 1 + for line in er.stdout.decode('UTF8').splitlines(): + if line.startswith('CPU(s):'): + cpus = int(line.split()[-1]) + + execs = [ + DestroyWorkerVM(), + UndefineWorkerVM(), + CopyWorkerImage(), + StartGuestNetworking(), + CreateWorkerVM(cpus), + TryUnmountWS(), + MountWS(), + ChownWS(), + Mkdir('/mnt/src', owner=WORKER_UID, group=WORKER_GID), + ] + self.exec_sequence(manager, *execs) + timer.report('setup') self.verbose('setting up workspace on worker') + ws = bs.workspace() - source = bs.source() - if m.setup_workspace(ws, source).failed(): - self.error('could not set up workspace') - sys.exit(1) + if os.path.exists(ws): + self.sync_to_workspace(ws, dest, '.') + timer.report('upload-saved-workspace') - self.verbose('starting worker') - w = m.start_worker() - if not w: - self.error('could not start worker') - sys.exit(1) + src = bs.source() + self.sync_to_workspace(src, dest, 'src') + timer.report('upload-source') + + execs = [ + UnmountWS(), + WorkerIP(), + ] + er = self.exec_sequence(manager, *execs) + worker_ip = er.stdout.decode('UTF8').strip() + timer.report('wait-for-worker') + + self.exec_sequence(manager, AttachWS()) + + worker = OnWorker( + dest, 'worker@{}'.format(worker_ip), verbose=self.verbose) + self.exec_sequence(worker, MountWSonWorker()) + execs = [] install = bs.install() if install: - self.verbose( - 'installing packages: {}'.format(', '.join(install))) - if w.install_packages(install).failed(): - self.error('failed to install packages') - sys.exit(1) - else: - self.verbose('no packages to install') - - self.verbose('building') - if w.build(bs.build()).failed(): - self.error('build failed') - sys.exit(1) + execs.append(AptInstall(install)) + + execs.append(Chdir('/workspace/src')) + execs.append(Build(bs.build())) + self.exec_sequence(worker, *execs) timer.report('build') - # This might fail. We ignore the failure. - self.verbose('stopping worker') - m.stop_worker() + execs = [ + ShutdownWorkerVM(), + MountWS(), + ] + self.exec_sequence(manager, *execs) + timer.report('shutdown-worker') if ws: self.verbose('saving workspace to {}'.format(ws)) - m.save_workspace(ws) + self.sync_from_workspace(dest, ws) + timer.report('save-workspace') overall.report('complete-run') self.verbose('build finished OK') @@ -119,18 +150,47 @@ class ContractorApplication(cliapp.Application): with open(filename) as f: return BuildSpec(f.read()) + def upload_worker_image(self, filename, dest): + self.verbose('uploading image to worker: {}'.format(filename)) + target = '{}:{}'.format(dest, WORKER_IMG) + if rsync(filename, target).failed(): + self.error('could not upload image to worker') + sys.exit(1) + + def sync_to_workspace(self, frm, dest, subdir): + er = rsync( + '{}/.'.format(frm), '{}:/mnt/{}/.'.format(dest, subdir)) + if er.failed(): + self.error('Failed to rsync saved workspace to worker') + sys.exit(1) + + def sync_from_workspace(self, dest, ws): + if not os.path.exists(ws): + os.makedirs(ws) + er = rsync('{}:/mnt/.'.format(dest), '{}/.'.format(ws)) + if er.failed(): + self.error('Failed to rsync workspace from worker') + sys.exit(1) + + def exec_sequence(self, manager, *execs): + er = manager.quietly(*execs) + if er.failed(): + self.error('Failed to do that, giving up') + sys.exit(1) + return er + def cmd_manager_status(self, args): - timer = Timer(self.verbose) - m = self.manager(timer) - if m.ssh(['true']).failed(): + dest = self.manager_destination() + manager = RemoteServer(dest) + if manager.quietly(TrueCmd()).failed(): self.error('Manager VM is NOT available') sys.exit(1) self.verbose('Manager VM is available') - def manager(self, timer): + def manager_destination(self): user = self.settings['manager-user'] addr = self.settings['manager-address'] - return Manager(user, addr, timer) + return '{}@{}'.format(user, addr) def error(self, msg): sys.stderr.write('ERROR: {}\n'.format(msg)) @@ -174,288 +234,350 @@ def rsync(filename, target): filename, target, ] logging.info('RSYNC: {!r}'.format(argv)) - p = subprocess.Popen(argv, stderr=STDOUT, stdout=PIPE) + p = subprocess.Popen(argv, stderr=STDOUT, stdout=None) out, err = p.communicate() return ExecResult(out, err, p.returncode) -class BuildSpec: +class RemoteServer: - def __init__(self, yaml_text): - spec = yaml.safe_load(yaml_text) - self._image = os.path.expanduser(self._get(spec, 'worker-image')) - self._source = os.path.expanduser(self._get(spec, 'source')) - self._workspace = os.path.expanduser(self._get(spec, 'workspace', '')) - self._install = self._get(spec, 'install', []) - self._build = self._get(spec, 'build') + def __init__(self, ssh_destination, verbose=None): + self._dest = ssh_destination + self._verbose = verbose - def worker_image(self): - return self._image + def quietly(self, *execs): + return self._execute(*execs, quiet=True) - def install(self): - return self._install + def verbosely(self, *execs): + return self._execute(*execs, quiet=False) - def source(self): - return self._source + def _msg(self, execs): + if self._verbose is not None: + for e in execs: + m = e.msg() or e.__class__.__name__ + self._verbose(' ' + m) - def workspace(self): - return self._workspace + def _argv(self, execs): + self._msg(execs) + snippets = [e.as_shell() for e in execs] + return ['sh', '-euc', '; '.join(snippets)] - def build(self): - return self._build + def _execute(self, *execs, quiet=None): + assert quiet is not None + return ssh(self._dest, self._argv(execs), quiet=quiet) - def as_dict(self): - return { - 'worker-image': self.worker_image(), - 'install': self.install(), - 'source': self.source(), - 'workspace': self.workspace(), - 'build': self.build(), - } - def _get(self, spec, key, default=None): - v = spec.get(key, default) - if v is None: - raise SpecMissingKey(key) - return v +class OnWorker(RemoteServer): + def __init__(self, manager, worker, verbose=None): + self._dest = manager + self._prefix = ['ssh', worker, '--'] + self._verbose = verbose -class SpecMissingKey(Exception): + def _execute(self, *execs, quiet=None): + assert quiet is not None + argv = [shlex.quote(a) for a in self._argv(execs)] + return ssh(self._dest, self._prefix + argv, quiet=quiet) + + +class RemoteExecution: + + def msg(self): + return None + + def argv(self): + raise NotImplementedError() + + def as_shell(self): + return ' '.join(shlex.quote(arg) for arg in self.argv()) + + +class MayFail(RemoteExecution): + + def as_shell(self): + return super().as_shell() + '|| true' + + +class TrueCmd(RemoteExecution): + + def msg(self): + return 'can we run true?' + + def argv(self): + return ['true'] + + +class AptInstall(RemoteExecution): + + def __init__(self, packages): + self._packages = packages + + def msg(self): + return 'installing packages: ' + ' '.join(self._packages) + + def argv(self): + return ['sudo', 'DEBIAN_FRONTEND=noninteractive', + 'apt-get', '-y', 'install'] + self._packages + + +class Chdir(RemoteExecution): + + def __init__(self, pathname): + self._pathname = pathname + + def msg(self): + return 'cd {}'.format(self._pathname) + + def argv(self): + return ['cd', self._pathname] + + +class Build(RemoteExecution): + + def __init__(self, shell): + self._shell = shell + + def msg(self): + return 'building' + + def argv(self): + return ['sh', '-euxc', self._shell] + + +def virsh(*args): + return ['virsh', '-c', 'qemu:///system'] + list(args) + + +class DestroyWorkerVM(MayFail): + + def msg(self): + return 'destroying worker VM' + + def argv(self): + return virsh('destroy', 'worker') + + +class UndefineWorkerVM(MayFail): + + def msg(self): + return 'undefining worker VM' + + def argv(self): + return virsh('undefine', 'worker') + + +class ShutdownWorkerVM(RemoteExecution): + + def msg(self): + return 'shutting down worker VM cleanly' + + def argv(self): + return ['sh', '-c', ''' +virsh -c qemu:///system shutdown worker +while virsh -c qemu:///system list --name | grep +do + sleep 0 +done +'''] - def __init__(self, key): - super().__init__( - 'Build specification is missing required key {!r}'.format(key)) +class CopyWorkerImage(RemoteExecution): -class Manager: - - def __init__(self, user, addr, timer): - self._target = '{}@{}'.format(user, addr) - self._timer = timer - - def ssh(self, argv, quiet=False): - return ssh(self._target, argv, quiet=quiet) - - def virsh(self, argv): - ret = ssh( - self._target, - ['virsh', '-c', 'qemu:///system'] + argv, - quiet=True) - self._timer.report('virsh {}'.format(' '.join(argv))) - return ret - - def kill_worker(self): - self.virsh(['destroy', 'worker']) - self.virsh(['undefine', 'worker']) - - def stop_worker(self): - self.virsh(['shutdown', 'worker']) - while True: - er = self.virsh(['list', '--name']) - if er.failed(): - break - if er.stdout.strip() == b'': - break - time.sleep(2) - - def upload_worker_image(self, filename): - ret = rsync(filename, '{}:{}'.format(self._target, WORKER_IMG)) - self._timer.report('upload-worker-image') - return ret - - def start_worker(self): - if self.copy_worker_image().failed(): - logging.error('Could not copy worker image for new instance') - return None - - # We ignore failures on these, as they shouldn't matter. - self.virsh(['net-autostart', 'default']) - self.virsh(['net-start', 'default']) - - n = self.get_cpu_count() - if not isinstance(n, int): - logging.error('Could not start worker due to missing CPU count') - return None - self._timer.report('get-cpu-count') - - er = self.ssh([ + def msg(self): + return 'copying image file for new worker VM' + + def argv(self): + return ['sh', '-c', 'rm -f temp.img; cp worker.img temp.img'] + self.ssh(['rm', '-f', TEMP_IMG]) + ret = self.ssh(['cp', WORKER_IMG, TEMP_IMG]) + + +class StartGuestNetworking(MayFail): + + def msg(self): + return 'starting guest network' + + def argv(self): + return virsh('net-start', 'default') + + +class MountWS(RemoteExecution): + + def msg(self): + return 'mounting workspace on manager' + + def argv(self): + return ['sudo', 'mount', WS_DEV, '/mnt'] + + +class MountWSonWorker(RemoteExecution): + + def msg(self): + return 'mounting workspace on worker' + + def argv(self): + return ['sudo', 'mount', '/dev/vdb', '/workspace'] + + +class TryUnmountWS(MayFail): + + def msg(self): + return 'trying to unmount workspace on manager' + + def argv(self): + return ['sudo', 'umount', '--quiet', WS_DEV] + + +class UnmountWS(RemoteExecution): + + def msg(self): + return 'unmounting workspace on manager' + + def argv(self): + return ['sudo', 'umount', '--quiet', WS_DEV] + + +class ChownWS(RemoteExecution): + + def msg(self): + return 'set ownerships on workspace' + + def argv(self): + return ['sudo', 'chown', '-R', + '{}:{}'.format(WORKER_UID, WORKER_GID), '/mnt'] + + +class Mkdir(RemoteExecution): + + def __init__(self, pathname, owner='root', group='root', mode=0o755): + self._argv = [ + 'install', '-d', + '-o', str(owner), + '-g', str(group), + '-m', '{:o}'.format(mode), + pathname, + ] + self._pathname = pathname + + def msg(self): + return 'create {}'.format(self._pathname) + + def argv(self): + return self._argv + + +class GetCPUCount(RemoteExecution): + + def msg(self): + return 'get CPU count' + + def argv(self): + return ['lscpu'] + + +class CreateWorkerVM(RemoteExecution): + + def __init__(self, manager_cpus): + self._worker_cpus = max(manager_cpus - 1, 1) + + def msg(self): + return 'creating worker VM' + + def argv(self): + return [ 'virt-install', '--connect', 'qemu:///system', '--quiet', '--name=worker', '--memory=4096', - '--vcpus={}'.format(n-1), + '--vcpus={}'.format(self._worker_cpus), '--cpu=host', '--import', '--os-variant=debian9', '--disk=path={},cache=none'.format(TEMP_IMG), - '--disk=path={},cache=none'.format(WS_DEV), '--network=network=default', '--graphics=spice', '--noautoconsole', - ]) - if er.failed(): - logging.error('Could not create worker VM') - return None + ] - self._timer.report('virt-install') - return self.wait_for_worker() - def copy_worker_image(self): - self.ssh(['rm', '-f', TEMP_IMG]) - ret = self.ssh(['cp', WORKER_IMG, TEMP_IMG]) - self._timer.report('copy-worker-image') - return ret +class AttachWS(RemoteExecution): - def get_cpu_count(self): - er = self.ssh(['lscpu'], quiet=True) - if er.failed(): - logging.error('lscpu on manager failed: {!r}'.format(err)) - return None + def msg(self): + return 'attach workspace disk to worker' - for line in er.stdout.decode('UTF8').splitlines(): - if line.startswith('CPU(s):'): - return int(line.split()[-1]) + def argv(self): + return virsh('attach-disk', 'worker', WS_DEV, 'vdb', + '--targetbus', 'virtio', '--live') - logging.error('Could not find number of CPUs on manager') - return None - def worker_ip(self): - filename = '/var/lib/libvirt/dnsmasq/virbr0.status' - er = self.ssh(['cat', filename], quiet=True) - if er.failed(): - logging.error('Could not read dnsmasq status file') - return None - - if not er.stdout: - return None - - status = json.loads(er.stdout) - if not status: - return None - - status.sort(key=lambda e: e['expiry-time']) - self._timer.report('worker-ip') - return status[-1]['ip-address'] - - def wait_for_worker(self): - # We look up the IP and try to use it. The IP might be for a - # previous worker instance. - while True: - ip = self.worker_ip() - if ip is None: - continue - w = Worker(self, ip) - self.ssh(['ssh-keygen', '-R', ip]) - if not w.ssh(['true'], quiet=True).failed(): - self._timer.report('wait-for-worker') - return w - time.sleep(2) - - def setup_workspace(self, saved_workspace, source): - er = self.setup_helper(saved_workspace, source) - if er.failed(): - logging.error('Could not set up workspace') - self.umount(WS_DEV) - return er +class WorkerIP(RemoteExecution): - def setup_helper(self, saved_workspace, source): - er = self.mount(WS_DEV) - if er.failed(): - return er - self._timer.report('mount-workspace') + def msg(self): + return 'get worker IP' - er = self.ssh( - ['sudo', 'chown', '-R', '{}:{}'.format(WORKER_UID, WORKER_GID), - '/mnt']) - if er.failed(): - return er - self._timer.report('chown-workspace') + def argv(self): + return ['sh', '-euc', ''' +status=/var/lib/libvirt/dnsmasq/virbr0.status +while true +do + ip="$(jq -r '.[-1]["ip-address"]' "$status")" + ssh-keygen -R "$ip" 2> /dev/null > /dev/null + if ssh "worker@$ip" true 2> /dev/null + then + break + fi +done +echo "$ip" +'''] - er = self.ssh( - ['sudo', 'install', '-d', '--owner={}'.format(WORKER_UID), - '--group={}'.format(WORKER_GID), '/mnt/src']) - if er.failed(): - return er - self._timer.report('create-workspace-src') - - if os.path.exists(saved_workspace): - er = rsync( - '{}/.'.format(saved_workspace), - '{}:/mnt/.'.format(self._target)) - if er.failed(): - return er - self._timer.report('upload-saved-workspace') - - er = rsync( - '{}/.'.format(source), - '{}:/mnt/src/.'.format(self._target)) - if er.failed(): - return er - self._timer.report('upload-source') - - return er - def save_workspace(self, saved_workspace): - er = self.save_helper(saved_workspace) - if er.failed(): - logging.error('Could not save workspace') - self.umount(WS_DEV) - return er - - def save_helper(self, saved_workspace): - er = self.mount(WS_DEV) - if er.failed(): - return er - self._timer.report('mount-workspace') +class BuildSpec: - if not os.path.exists(saved_workspace): - os.makedirs(saved_workspace) + def __init__(self, yaml_text): + spec = yaml.safe_load(yaml_text) + self._image = os.path.expanduser(self._get(spec, 'worker-image')) + self._source = os.path.expanduser(self._get(spec, 'source')) + self._workspace = os.path.expanduser(self._get(spec, 'workspace', '')) + self._install = self._get(spec, 'install', []) + self._build = self._get(spec, 'build') - ret = rsync( - '{}:/mnt/.'.format(self._target), - '{}/.'.format(saved_workspace)) - self._timer.report('save-workspace') - return ret + def worker_image(self): + return self._image - def kpartx(self, options): - er = self.ssh(['sudo', 'kpartx', options, TEMP_IMG]) - if er.failed(): - logging.error('Could not add partitions for worker image') - return er + def install(self): + return self._install - def mount(self, device): - er = self.ssh(['sudo', 'mount', device, '/mnt']) - if er.failed(): - logging.error('Could not mount device') - return er + def source(self): + return self._source - def umount(self, device): - er = self.ssh(['sudo', 'umount', device]) - if er.failed(): - logging.error('Could not mount device') - return er + def workspace(self): + return self._workspace + def build(self): + return self._build -class Worker: + def as_dict(self): + return { + 'worker-image': self.worker_image(), + 'install': self.install(), + 'source': self.source(), + 'workspace': self.workspace(), + 'build': self.build(), + } - def __init__(self, manager, ip): - self._manager = manager - self._target = 'worker@{}'.format(ip) + def _get(self, spec, key, default=None): + v = spec.get(key, default) + if v is None: + raise SpecMissingKey(key) + return v - def ssh(self, argv, quiet=False): - argv = [shlex.quote(arg) for arg in argv] - argv = ['ssh', self._target, '--'] + argv - return self._manager.ssh(argv, quiet=quiet) - def install_packages(self, pkgs): - return self.ssh( - ['sudo', 'DEBIAN_FRONTEND=noninteractive', - 'apt-get', '-y', 'install'] + pkgs) +class SpecMissingKey(Exception): - def build(self, shell_text): - shell_text = 'cd /workspace/src\n' + shell_text - return self.ssh(['sh', '-euxc', shell_text]) + def __init__(self, key): + super().__init__( + 'Build specification is missing required key {!r}'.format(key)) class Timer: diff --git a/funcs.py b/funcs.py index ada6e7d..ecc7d81 100644 --- a/funcs.py +++ b/funcs.py @@ -79,7 +79,6 @@ def run_contractor_dump(ctx, filename=None): # Run the contractor to do a build. def run_contractor_build(ctx, filename=None): - pass argv = _contractor() + ['build', filename] _run(ctx, argv) diff --git a/worker.yml b/worker.yml index 378d0b1..7551a56 100644 --- a/worker.yml +++ b/worker.yml @@ -45,10 +45,10 @@ - file: state: directory path: /workspace - - lineinfile: - path: /etc/fstab - regexp: '^/dev/vdb' - line: '/dev/vdb /workspace ext4 defaults 0 2' + # - lineinfile: + # path: /etc/fstab + # regexp: '^/dev/vdb' + # line: '/dev/vdb /workspace ext4 defaults 0 2' vars: host: worker ansible_python_interpreter: /usr/bin/python3 -- cgit v1.2.1