From ab1016a4e73169a0f54674aea854251074c7a44f Mon Sep 17 00:00:00 2001 From: Lars Wirzenius Date: Sun, 14 Jan 2018 19:25:51 +0200 Subject: Refactor: worker manager buffers output This should be good for large outputs. --- worker_manager | 84 ++++++++++++++++++++++++++++++++++++++++++---------------- 1 file changed, 61 insertions(+), 23 deletions(-) (limited to 'worker_manager') diff --git a/worker_manager b/worker_manager index 989ae21..1cc012e 100755 --- a/worker_manager +++ b/worker_manager @@ -303,18 +303,19 @@ class Worker: return time.strftime('%Y-%m-%dT%H:%M:%S') def worker_factory(self, step): - if 'shell' in step: - return ShellWorker - elif 'python' in step: - return PythonWorker - elif 'debootstrap' in step: - return DebootstrapWorker - elif 'archive' in step: - return WorkspaceArchiver - elif step.get('action') == 'populate_systree': - return SystreePopulator - elif step.get('action') == 'create_workspace': - return WorkspaceCleaner + table = [ + ('shell', None, ShellWorker), + ('python', None, PythonWorker), + ('debootstrap', None, DebootstrapWorker), + ('archive', None, WorkspaceArchiver), + ('action', 'populate_systree', SystreePopulator), + ('action', 'create_workspace', WorkspaceCleaner), + ] + + for key, value, klass in table: + if key in step and (value is None or step[key] == value): + return klass + logging.warning('Cannot find worker for %s', step) return None @@ -324,6 +325,48 @@ class Worker: self._api.report_work(s) +class Runner: + + def __init__(self, post_output): + self._post = post_output + self._buffers = {'stdout': '', 'stderr': ''} + self._maxbuf = 2**10 + self._timeout = 1.0 + + def runcmd(self, argv, **kwargs): + logging.debug('Runner.runcmd: argv=%r %r', argv, kwargs) + exit_code, _, _ = cliapp.runcmd_unchecked( + argv, + stdout_callback=self.capture_stdout, + stderr_callback=self.capture_stderr, + timeout_callback=self.flush, + **kwargs + ) + self.flush() + logging.debug('Runner.runcmd: finished, exit_code=%r', exit_code) + return exit_code + + def capture_stdout(self, data): + return self.capture('stdout', data) + + def capture_stderr(self, data): + return self.capture('stderr', data) + + def capture(self, stream_name, data): + self._buffers[stream_name] += data.decode('UTF-8') + if len(self._buffers[stream_name]) >= self._maxbuf: + self.flush() + return None + + def flush(self): + for stream in self._buffers: + buf = self._buffers[stream] + self._buffers[stream] = '' + if buf: + logging.debug('Posting %d bytes of %s', len(buf), stream) + return self._post(stream, buf) + + class WorkerBase: def __init__(self, api, workspace, systree, post): @@ -342,18 +385,13 @@ class WorkerBase: elif self.where(work) == 'container': logging.debug('CONTAINER REQUESTED') argv = [ - 'sudo', 'systemd-nspawn', '-D', self._systree, '--bind', self._workspace, + 'sudo', 'systemd-nspawn', '-D', self._systree, + '--bind', self._workspace, ] + argv else: logging.debug('HOST REQUESTED') - logging.debug('running: %r', argv) - exit_code, _, _ = cliapp.runcmd_unchecked( - argv, - stdout_callback=self.report_stdout, - stderr_callback=self.report_stderr, - cwd=self._workspace, - ) - return exit_code + runner = Runner(self._post) + return runner.runcmd(argv, cwd=self._workspace) def report_stdout(self, data): return self._post('stdout', data.decode('UTF-8')) @@ -466,14 +504,14 @@ class SystreePopulator(WorkerBase): systree_name = step.get('systree_name') if not systree_name: self.report( - b'No systree_name field in action, no systree population') + b'No systree_name field in action, no systree population\n') return 1 self.make_directory_empty(self.systree_dir) tarball = self._api.download_blob(systree_name) self.unpack_systree(tarball, self.systree_dir) - self.report(b'Systree has been populated') + self.report(b'Systree has been populated\n') return 0 def make_directory_empty(self, dirname): -- cgit v1.2.1