summaryrefslogtreecommitdiff
path: root/worker_manager
diff options
context:
space:
mode:
authorLars Wirzenius <liw@liw.fi>2018-01-14 19:25:51 +0200
committerLars Wirzenius <liw@liw.fi>2018-01-14 22:25:02 +0200
commitab1016a4e73169a0f54674aea854251074c7a44f (patch)
tree8cd5235db72b8e28417e09f9d6930b92d97fc78a /worker_manager
parent2477c5bf213ae311b1bf812f3097a26d8d9ead4c (diff)
downloadick2-ab1016a4e73169a0f54674aea854251074c7a44f.tar.gz
Refactor: worker manager buffers output
This should be good for large outputs.
Diffstat (limited to 'worker_manager')
-rwxr-xr-xworker_manager84
1 files changed, 61 insertions, 23 deletions
diff --git a/worker_manager b/worker_manager
index 989ae21..1cc012e 100755
--- a/worker_manager
+++ b/worker_manager
@@ -303,18 +303,19 @@ class Worker:
return time.strftime('%Y-%m-%dT%H:%M:%S')
def worker_factory(self, step):
- if 'shell' in step:
- return ShellWorker
- elif 'python' in step:
- return PythonWorker
- elif 'debootstrap' in step:
- return DebootstrapWorker
- elif 'archive' in step:
- return WorkspaceArchiver
- elif step.get('action') == 'populate_systree':
- return SystreePopulator
- elif step.get('action') == 'create_workspace':
- return WorkspaceCleaner
+ table = [
+ ('shell', None, ShellWorker),
+ ('python', None, PythonWorker),
+ ('debootstrap', None, DebootstrapWorker),
+ ('archive', None, WorkspaceArchiver),
+ ('action', 'populate_systree', SystreePopulator),
+ ('action', 'create_workspace', WorkspaceCleaner),
+ ]
+
+ for key, value, klass in table:
+ if key in step and (value is None or step[key] == value):
+ return klass
+
logging.warning('Cannot find worker for %s', step)
return None
@@ -324,6 +325,48 @@ class Worker:
self._api.report_work(s)
+class Runner:
+
+ def __init__(self, post_output):
+ self._post = post_output
+ self._buffers = {'stdout': '', 'stderr': ''}
+ self._maxbuf = 2**10
+ self._timeout = 1.0
+
+ def runcmd(self, argv, **kwargs):
+ logging.debug('Runner.runcmd: argv=%r %r', argv, kwargs)
+ exit_code, _, _ = cliapp.runcmd_unchecked(
+ argv,
+ stdout_callback=self.capture_stdout,
+ stderr_callback=self.capture_stderr,
+ timeout_callback=self.flush,
+ **kwargs
+ )
+ self.flush()
+ logging.debug('Runner.runcmd: finished, exit_code=%r', exit_code)
+ return exit_code
+
+ def capture_stdout(self, data):
+ return self.capture('stdout', data)
+
+ def capture_stderr(self, data):
+ return self.capture('stderr', data)
+
+ def capture(self, stream_name, data):
+ self._buffers[stream_name] += data.decode('UTF-8')
+ if len(self._buffers[stream_name]) >= self._maxbuf:
+ self.flush()
+ return None
+
+ def flush(self):
+ for stream in self._buffers:
+ buf = self._buffers[stream]
+ self._buffers[stream] = ''
+ if buf:
+ logging.debug('Posting %d bytes of %s', len(buf), stream)
+ return self._post(stream, buf)
+
+
class WorkerBase:
def __init__(self, api, workspace, systree, post):
@@ -342,18 +385,13 @@ class WorkerBase:
elif self.where(work) == 'container':
logging.debug('CONTAINER REQUESTED')
argv = [
- 'sudo', 'systemd-nspawn', '-D', self._systree, '--bind', self._workspace,
+ 'sudo', 'systemd-nspawn', '-D', self._systree,
+ '--bind', self._workspace,
] + argv
else:
logging.debug('HOST REQUESTED')
- logging.debug('running: %r', argv)
- exit_code, _, _ = cliapp.runcmd_unchecked(
- argv,
- stdout_callback=self.report_stdout,
- stderr_callback=self.report_stderr,
- cwd=self._workspace,
- )
- return exit_code
+ runner = Runner(self._post)
+ return runner.runcmd(argv, cwd=self._workspace)
def report_stdout(self, data):
return self._post('stdout', data.decode('UTF-8'))
@@ -466,14 +504,14 @@ class SystreePopulator(WorkerBase):
systree_name = step.get('systree_name')
if not systree_name:
self.report(
- b'No systree_name field in action, no systree population')
+ b'No systree_name field in action, no systree population\n')
return 1
self.make_directory_empty(self.systree_dir)
tarball = self._api.download_blob(systree_name)
self.unpack_systree(tarball, self.systree_dir)
- self.report(b'Systree has been populated')
+ self.report(b'Systree has been populated\n')
return 0
def make_directory_empty(self, dirname):