summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLars Wirzenius <liw@liw.fi>2020-04-16 09:17:18 +0300
committerLars Wirzenius <liw@liw.fi>2020-04-17 21:27:06 +0300
commit6c18d9fca73165b75d61f9ce873ead23d5ad425d (patch)
treea8e691b3139abdfe9d2b60c1caec69e3cd417b3e
parentb4ab447f0e86b06b898018701c1fbd8a568fc850 (diff)
downloadick-contractor-6c18d9fca73165b75d61f9ce873ead23d5ad425d.tar.gz
Add: abstract running commands over SSH; reduce SSH calls
This speeds up a no-op test from about 50 seconds to 30 seconds on my laptop.
-rwxr-xr-xcontractor666
-rw-r--r--funcs.py1
-rw-r--r--worker.yml8
3 files changed, 398 insertions, 277 deletions
diff --git a/contractor b/contractor
index 5389a0a..e3475f3 100755
--- a/contractor
+++ b/contractor
@@ -59,58 +59,89 @@ class ContractorApplication(cliapp.Application):
def cmd_build(self, args):
timer = Timer(self.verbose)
overall = Timer(self.verbose)
+ dest = self.manager_destination()
+ manager = RemoteServer(dest, verbose=self.verbose)
self.verbose('building using spec at {}'.format(args[0]))
bs = self.load_build_spec(args[0])
- m = self.manager(timer)
- timer.report('load-build-spec')
- self.verbose(
- 'uploading image to worker: {}'.format(bs.worker_image()))
- if m.upload_worker_image(bs.worker_image()).failed():
- self.error('could not upload image to worker')
- sys.exit(1)
+ self.upload_worker_image(bs.worker_image(), dest)
+ timer.report('upload-worker-image')
- # This might fail. We ignore the failure.
- self.verbose('stopping worker')
- m.kill_worker()
+ # Do the minimum needed to start worker VM. The VM takes a
+ # while to boot and we can do other things while that happens.
+ execs = [
+ GetCPUCount(),
+ ]
+ er = self.exec_sequence(manager, *execs)
+ timer.report('setup')
+
+ # Find number of CPUs.
+ cpus = 1
+ for line in er.stdout.decode('UTF8').splitlines():
+ if line.startswith('CPU(s):'):
+ cpus = int(line.split()[-1])
+
+ execs = [
+ DestroyWorkerVM(),
+ UndefineWorkerVM(),
+ CopyWorkerImage(),
+ StartGuestNetworking(),
+ CreateWorkerVM(cpus),
+ TryUnmountWS(),
+ MountWS(),
+ ChownWS(),
+ Mkdir('/mnt/src', owner=WORKER_UID, group=WORKER_GID),
+ ]
+ self.exec_sequence(manager, *execs)
+ timer.report('setup')
self.verbose('setting up workspace on worker')
+
ws = bs.workspace()
- source = bs.source()
- if m.setup_workspace(ws, source).failed():
- self.error('could not set up workspace')
- sys.exit(1)
+ if os.path.exists(ws):
+ self.sync_to_workspace(ws, dest, '.')
+ timer.report('upload-saved-workspace')
- self.verbose('starting worker')
- w = m.start_worker()
- if not w:
- self.error('could not start worker')
- sys.exit(1)
+ src = bs.source()
+ self.sync_to_workspace(src, dest, 'src')
+ timer.report('upload-source')
+
+ execs = [
+ UnmountWS(),
+ WorkerIP(),
+ ]
+ er = self.exec_sequence(manager, *execs)
+ worker_ip = er.stdout.decode('UTF8').strip()
+ timer.report('wait-for-worker')
+
+ self.exec_sequence(manager, AttachWS())
+
+ worker = OnWorker(
+ dest, 'worker@{}'.format(worker_ip), verbose=self.verbose)
+ self.exec_sequence(worker, MountWSonWorker())
+ execs = []
install = bs.install()
if install:
- self.verbose(
- 'installing packages: {}'.format(', '.join(install)))
- if w.install_packages(install).failed():
- self.error('failed to install packages')
- sys.exit(1)
- else:
- self.verbose('no packages to install')
-
- self.verbose('building')
- if w.build(bs.build()).failed():
- self.error('build failed')
- sys.exit(1)
+ execs.append(AptInstall(install))
+
+ execs.append(Chdir('/workspace/src'))
+ execs.append(Build(bs.build()))
+ self.exec_sequence(worker, *execs)
timer.report('build')
- # This might fail. We ignore the failure.
- self.verbose('stopping worker')
- m.stop_worker()
+ execs = [
+ ShutdownWorkerVM(),
+ MountWS(),
+ ]
+ self.exec_sequence(manager, *execs)
+ timer.report('shutdown-worker')
if ws:
self.verbose('saving workspace to {}'.format(ws))
- m.save_workspace(ws)
+ self.sync_from_workspace(dest, ws)
+ timer.report('save-workspace')
overall.report('complete-run')
self.verbose('build finished OK')
@@ -119,18 +150,47 @@ class ContractorApplication(cliapp.Application):
with open(filename) as f:
return BuildSpec(f.read())
+ def upload_worker_image(self, filename, dest):
+ self.verbose('uploading image to worker: {}'.format(filename))
+ target = '{}:{}'.format(dest, WORKER_IMG)
+ if rsync(filename, target).failed():
+ self.error('could not upload image to worker')
+ sys.exit(1)
+
+ def sync_to_workspace(self, frm, dest, subdir):
+ er = rsync(
+ '{}/.'.format(frm), '{}:/mnt/{}/.'.format(dest, subdir))
+ if er.failed():
+ self.error('Failed to rsync saved workspace to worker')
+ sys.exit(1)
+
+ def sync_from_workspace(self, dest, ws):
+ if not os.path.exists(ws):
+ os.makedirs(ws)
+ er = rsync('{}:/mnt/.'.format(dest), '{}/.'.format(ws))
+ if er.failed():
+ self.error('Failed to rsync workspace from worker')
+ sys.exit(1)
+
+ def exec_sequence(self, manager, *execs):
+ er = manager.quietly(*execs)
+ if er.failed():
+ self.error('Failed to do that, giving up')
+ sys.exit(1)
+ return er
+
def cmd_manager_status(self, args):
- timer = Timer(self.verbose)
- m = self.manager(timer)
- if m.ssh(['true']).failed():
+ dest = self.manager_destination()
+ manager = RemoteServer(dest)
+ if manager.quietly(TrueCmd()).failed():
self.error('Manager VM is NOT available')
sys.exit(1)
self.verbose('Manager VM is available')
- def manager(self, timer):
+ def manager_destination(self):
user = self.settings['manager-user']
addr = self.settings['manager-address']
- return Manager(user, addr, timer)
+ return '{}@{}'.format(user, addr)
def error(self, msg):
sys.stderr.write('ERROR: {}\n'.format(msg))
@@ -174,288 +234,350 @@ def rsync(filename, target):
filename, target,
]
logging.info('RSYNC: {!r}'.format(argv))
- p = subprocess.Popen(argv, stderr=STDOUT, stdout=PIPE)
+ p = subprocess.Popen(argv, stderr=STDOUT, stdout=None)
out, err = p.communicate()
return ExecResult(out, err, p.returncode)
-class BuildSpec:
+class RemoteServer:
- def __init__(self, yaml_text):
- spec = yaml.safe_load(yaml_text)
- self._image = os.path.expanduser(self._get(spec, 'worker-image'))
- self._source = os.path.expanduser(self._get(spec, 'source'))
- self._workspace = os.path.expanduser(self._get(spec, 'workspace', ''))
- self._install = self._get(spec, 'install', [])
- self._build = self._get(spec, 'build')
+ def __init__(self, ssh_destination, verbose=None):
+ self._dest = ssh_destination
+ self._verbose = verbose
- def worker_image(self):
- return self._image
+ def quietly(self, *execs):
+ return self._execute(*execs, quiet=True)
- def install(self):
- return self._install
+ def verbosely(self, *execs):
+ return self._execute(*execs, quiet=False)
- def source(self):
- return self._source
+ def _msg(self, execs):
+ if self._verbose is not None:
+ for e in execs:
+ m = e.msg() or e.__class__.__name__
+ self._verbose(' ' + m)
- def workspace(self):
- return self._workspace
+ def _argv(self, execs):
+ self._msg(execs)
+ snippets = [e.as_shell() for e in execs]
+ return ['sh', '-euc', '; '.join(snippets)]
- def build(self):
- return self._build
+ def _execute(self, *execs, quiet=None):
+ assert quiet is not None
+ return ssh(self._dest, self._argv(execs), quiet=quiet)
- def as_dict(self):
- return {
- 'worker-image': self.worker_image(),
- 'install': self.install(),
- 'source': self.source(),
- 'workspace': self.workspace(),
- 'build': self.build(),
- }
- def _get(self, spec, key, default=None):
- v = spec.get(key, default)
- if v is None:
- raise SpecMissingKey(key)
- return v
+class OnWorker(RemoteServer):
+ def __init__(self, manager, worker, verbose=None):
+ self._dest = manager
+ self._prefix = ['ssh', worker, '--']
+ self._verbose = verbose
-class SpecMissingKey(Exception):
+ def _execute(self, *execs, quiet=None):
+ assert quiet is not None
+ argv = [shlex.quote(a) for a in self._argv(execs)]
+ return ssh(self._dest, self._prefix + argv, quiet=quiet)
+
+
+class RemoteExecution:
+
+ def msg(self):
+ return None
+
+ def argv(self):
+ raise NotImplementedError()
+
+ def as_shell(self):
+ return ' '.join(shlex.quote(arg) for arg in self.argv())
+
+
+class MayFail(RemoteExecution):
+
+ def as_shell(self):
+ return super().as_shell() + '|| true'
+
+
+class TrueCmd(RemoteExecution):
+
+ def msg(self):
+ return 'can we run true?'
+
+ def argv(self):
+ return ['true']
+
+
+class AptInstall(RemoteExecution):
+
+ def __init__(self, packages):
+ self._packages = packages
+
+ def msg(self):
+ return 'installing packages: ' + ' '.join(self._packages)
+
+ def argv(self):
+ return ['sudo', 'DEBIAN_FRONTEND=noninteractive',
+ 'apt-get', '-y', 'install'] + self._packages
+
+
+class Chdir(RemoteExecution):
+
+ def __init__(self, pathname):
+ self._pathname = pathname
+
+ def msg(self):
+ return 'cd {}'.format(self._pathname)
+
+ def argv(self):
+ return ['cd', self._pathname]
+
+
+class Build(RemoteExecution):
+
+ def __init__(self, shell):
+ self._shell = shell
+
+ def msg(self):
+ return 'building'
+
+ def argv(self):
+ return ['sh', '-euxc', self._shell]
+
+
+def virsh(*args):
+ return ['virsh', '-c', 'qemu:///system'] + list(args)
+
+
+class DestroyWorkerVM(MayFail):
+
+ def msg(self):
+ return 'destroying worker VM'
+
+ def argv(self):
+ return virsh('destroy', 'worker')
+
+
+class UndefineWorkerVM(MayFail):
+
+ def msg(self):
+ return 'undefining worker VM'
+
+ def argv(self):
+ return virsh('undefine', 'worker')
+
+
+class ShutdownWorkerVM(RemoteExecution):
+
+ def msg(self):
+ return 'shutting down worker VM cleanly'
+
+ def argv(self):
+ return ['sh', '-c', '''
+virsh -c qemu:///system shutdown worker
+while virsh -c qemu:///system list --name | grep
+do
+ sleep 0
+done
+''']
- def __init__(self, key):
- super().__init__(
- 'Build specification is missing required key {!r}'.format(key))
+class CopyWorkerImage(RemoteExecution):
-class Manager:
-
- def __init__(self, user, addr, timer):
- self._target = '{}@{}'.format(user, addr)
- self._timer = timer
-
- def ssh(self, argv, quiet=False):
- return ssh(self._target, argv, quiet=quiet)
-
- def virsh(self, argv):
- ret = ssh(
- self._target,
- ['virsh', '-c', 'qemu:///system'] + argv,
- quiet=True)
- self._timer.report('virsh {}'.format(' '.join(argv)))
- return ret
-
- def kill_worker(self):
- self.virsh(['destroy', 'worker'])
- self.virsh(['undefine', 'worker'])
-
- def stop_worker(self):
- self.virsh(['shutdown', 'worker'])
- while True:
- er = self.virsh(['list', '--name'])
- if er.failed():
- break
- if er.stdout.strip() == b'':
- break
- time.sleep(2)
-
- def upload_worker_image(self, filename):
- ret = rsync(filename, '{}:{}'.format(self._target, WORKER_IMG))
- self._timer.report('upload-worker-image')
- return ret
-
- def start_worker(self):
- if self.copy_worker_image().failed():
- logging.error('Could not copy worker image for new instance')
- return None
-
- # We ignore failures on these, as they shouldn't matter.
- self.virsh(['net-autostart', 'default'])
- self.virsh(['net-start', 'default'])
-
- n = self.get_cpu_count()
- if not isinstance(n, int):
- logging.error('Could not start worker due to missing CPU count')
- return None
- self._timer.report('get-cpu-count')
-
- er = self.ssh([
+ def msg(self):
+ return 'copying image file for new worker VM'
+
+ def argv(self):
+ return ['sh', '-c', 'rm -f temp.img; cp worker.img temp.img']
+ self.ssh(['rm', '-f', TEMP_IMG])
+ ret = self.ssh(['cp', WORKER_IMG, TEMP_IMG])
+
+
+class StartGuestNetworking(MayFail):
+
+ def msg(self):
+ return 'starting guest network'
+
+ def argv(self):
+ return virsh('net-start', 'default')
+
+
+class MountWS(RemoteExecution):
+
+ def msg(self):
+ return 'mounting workspace on manager'
+
+ def argv(self):
+ return ['sudo', 'mount', WS_DEV, '/mnt']
+
+
+class MountWSonWorker(RemoteExecution):
+
+ def msg(self):
+ return 'mounting workspace on worker'
+
+ def argv(self):
+ return ['sudo', 'mount', '/dev/vdb', '/workspace']
+
+
+class TryUnmountWS(MayFail):
+
+ def msg(self):
+ return 'trying to unmount workspace on manager'
+
+ def argv(self):
+ return ['sudo', 'umount', '--quiet', WS_DEV]
+
+
+class UnmountWS(RemoteExecution):
+
+ def msg(self):
+ return 'unmounting workspace on manager'
+
+ def argv(self):
+ return ['sudo', 'umount', '--quiet', WS_DEV]
+
+
+class ChownWS(RemoteExecution):
+
+ def msg(self):
+ return 'set ownerships on workspace'
+
+ def argv(self):
+ return ['sudo', 'chown', '-R',
+ '{}:{}'.format(WORKER_UID, WORKER_GID), '/mnt']
+
+
+class Mkdir(RemoteExecution):
+
+ def __init__(self, pathname, owner='root', group='root', mode=0o755):
+ self._argv = [
+ 'install', '-d',
+ '-o', str(owner),
+ '-g', str(group),
+ '-m', '{:o}'.format(mode),
+ pathname,
+ ]
+ self._pathname = pathname
+
+ def msg(self):
+ return 'create {}'.format(self._pathname)
+
+ def argv(self):
+ return self._argv
+
+
+class GetCPUCount(RemoteExecution):
+
+ def msg(self):
+ return 'get CPU count'
+
+ def argv(self):
+ return ['lscpu']
+
+
+class CreateWorkerVM(RemoteExecution):
+
+ def __init__(self, manager_cpus):
+ self._worker_cpus = max(manager_cpus - 1, 1)
+
+ def msg(self):
+ return 'creating worker VM'
+
+ def argv(self):
+ return [
'virt-install',
'--connect', 'qemu:///system',
'--quiet',
'--name=worker',
'--memory=4096',
- '--vcpus={}'.format(n-1),
+ '--vcpus={}'.format(self._worker_cpus),
'--cpu=host',
'--import',
'--os-variant=debian9',
'--disk=path={},cache=none'.format(TEMP_IMG),
- '--disk=path={},cache=none'.format(WS_DEV),
'--network=network=default',
'--graphics=spice',
'--noautoconsole',
- ])
- if er.failed():
- logging.error('Could not create worker VM')
- return None
+ ]
- self._timer.report('virt-install')
- return self.wait_for_worker()
- def copy_worker_image(self):
- self.ssh(['rm', '-f', TEMP_IMG])
- ret = self.ssh(['cp', WORKER_IMG, TEMP_IMG])
- self._timer.report('copy-worker-image')
- return ret
+class AttachWS(RemoteExecution):
- def get_cpu_count(self):
- er = self.ssh(['lscpu'], quiet=True)
- if er.failed():
- logging.error('lscpu on manager failed: {!r}'.format(err))
- return None
+ def msg(self):
+ return 'attach workspace disk to worker'
- for line in er.stdout.decode('UTF8').splitlines():
- if line.startswith('CPU(s):'):
- return int(line.split()[-1])
+ def argv(self):
+ return virsh('attach-disk', 'worker', WS_DEV, 'vdb',
+ '--targetbus', 'virtio', '--live')
- logging.error('Could not find number of CPUs on manager')
- return None
- def worker_ip(self):
- filename = '/var/lib/libvirt/dnsmasq/virbr0.status'
- er = self.ssh(['cat', filename], quiet=True)
- if er.failed():
- logging.error('Could not read dnsmasq status file')
- return None
-
- if not er.stdout:
- return None
-
- status = json.loads(er.stdout)
- if not status:
- return None
-
- status.sort(key=lambda e: e['expiry-time'])
- self._timer.report('worker-ip')
- return status[-1]['ip-address']
-
- def wait_for_worker(self):
- # We look up the IP and try to use it. The IP might be for a
- # previous worker instance.
- while True:
- ip = self.worker_ip()
- if ip is None:
- continue
- w = Worker(self, ip)
- self.ssh(['ssh-keygen', '-R', ip])
- if not w.ssh(['true'], quiet=True).failed():
- self._timer.report('wait-for-worker')
- return w
- time.sleep(2)
-
- def setup_workspace(self, saved_workspace, source):
- er = self.setup_helper(saved_workspace, source)
- if er.failed():
- logging.error('Could not set up workspace')
- self.umount(WS_DEV)
- return er
+class WorkerIP(RemoteExecution):
- def setup_helper(self, saved_workspace, source):
- er = self.mount(WS_DEV)
- if er.failed():
- return er
- self._timer.report('mount-workspace')
+ def msg(self):
+ return 'get worker IP'
- er = self.ssh(
- ['sudo', 'chown', '-R', '{}:{}'.format(WORKER_UID, WORKER_GID),
- '/mnt'])
- if er.failed():
- return er
- self._timer.report('chown-workspace')
+ def argv(self):
+ return ['sh', '-euc', '''
+status=/var/lib/libvirt/dnsmasq/virbr0.status
+while true
+do
+ ip="$(jq -r '.[-1]["ip-address"]' "$status")"
+ ssh-keygen -R "$ip" 2> /dev/null > /dev/null
+ if ssh "worker@$ip" true 2> /dev/null
+ then
+ break
+ fi
+done
+echo "$ip"
+''']
- er = self.ssh(
- ['sudo', 'install', '-d', '--owner={}'.format(WORKER_UID),
- '--group={}'.format(WORKER_GID), '/mnt/src'])
- if er.failed():
- return er
- self._timer.report('create-workspace-src')
-
- if os.path.exists(saved_workspace):
- er = rsync(
- '{}/.'.format(saved_workspace),
- '{}:/mnt/.'.format(self._target))
- if er.failed():
- return er
- self._timer.report('upload-saved-workspace')
-
- er = rsync(
- '{}/.'.format(source),
- '{}:/mnt/src/.'.format(self._target))
- if er.failed():
- return er
- self._timer.report('upload-source')
-
- return er
- def save_workspace(self, saved_workspace):
- er = self.save_helper(saved_workspace)
- if er.failed():
- logging.error('Could not save workspace')
- self.umount(WS_DEV)
- return er
-
- def save_helper(self, saved_workspace):
- er = self.mount(WS_DEV)
- if er.failed():
- return er
- self._timer.report('mount-workspace')
+class BuildSpec:
- if not os.path.exists(saved_workspace):
- os.makedirs(saved_workspace)
+ def __init__(self, yaml_text):
+ spec = yaml.safe_load(yaml_text)
+ self._image = os.path.expanduser(self._get(spec, 'worker-image'))
+ self._source = os.path.expanduser(self._get(spec, 'source'))
+ self._workspace = os.path.expanduser(self._get(spec, 'workspace', ''))
+ self._install = self._get(spec, 'install', [])
+ self._build = self._get(spec, 'build')
- ret = rsync(
- '{}:/mnt/.'.format(self._target),
- '{}/.'.format(saved_workspace))
- self._timer.report('save-workspace')
- return ret
+ def worker_image(self):
+ return self._image
- def kpartx(self, options):
- er = self.ssh(['sudo', 'kpartx', options, TEMP_IMG])
- if er.failed():
- logging.error('Could not add partitions for worker image')
- return er
+ def install(self):
+ return self._install
- def mount(self, device):
- er = self.ssh(['sudo', 'mount', device, '/mnt'])
- if er.failed():
- logging.error('Could not mount device')
- return er
+ def source(self):
+ return self._source
- def umount(self, device):
- er = self.ssh(['sudo', 'umount', device])
- if er.failed():
- logging.error('Could not mount device')
- return er
+ def workspace(self):
+ return self._workspace
+ def build(self):
+ return self._build
-class Worker:
+ def as_dict(self):
+ return {
+ 'worker-image': self.worker_image(),
+ 'install': self.install(),
+ 'source': self.source(),
+ 'workspace': self.workspace(),
+ 'build': self.build(),
+ }
- def __init__(self, manager, ip):
- self._manager = manager
- self._target = 'worker@{}'.format(ip)
+ def _get(self, spec, key, default=None):
+ v = spec.get(key, default)
+ if v is None:
+ raise SpecMissingKey(key)
+ return v
- def ssh(self, argv, quiet=False):
- argv = [shlex.quote(arg) for arg in argv]
- argv = ['ssh', self._target, '--'] + argv
- return self._manager.ssh(argv, quiet=quiet)
- def install_packages(self, pkgs):
- return self.ssh(
- ['sudo', 'DEBIAN_FRONTEND=noninteractive',
- 'apt-get', '-y', 'install'] + pkgs)
+class SpecMissingKey(Exception):
- def build(self, shell_text):
- shell_text = 'cd /workspace/src\n' + shell_text
- return self.ssh(['sh', '-euxc', shell_text])
+ def __init__(self, key):
+ super().__init__(
+ 'Build specification is missing required key {!r}'.format(key))
class Timer:
diff --git a/funcs.py b/funcs.py
index ada6e7d..ecc7d81 100644
--- a/funcs.py
+++ b/funcs.py
@@ -79,7 +79,6 @@ def run_contractor_dump(ctx, filename=None):
# Run the contractor to do a build.
def run_contractor_build(ctx, filename=None):
- pass
argv = _contractor() + ['build', filename]
_run(ctx, argv)
diff --git a/worker.yml b/worker.yml
index 378d0b1..7551a56 100644
--- a/worker.yml
+++ b/worker.yml
@@ -45,10 +45,10 @@
- file:
state: directory
path: /workspace
- - lineinfile:
- path: /etc/fstab
- regexp: '^/dev/vdb'
- line: '/dev/vdb /workspace ext4 defaults 0 2'
+ # - lineinfile:
+ # path: /etc/fstab
+ # regexp: '^/dev/vdb'
+ # line: '/dev/vdb /workspace ext4 defaults 0 2'
vars:
host: worker
ansible_python_interpreter: /usr/bin/python3