From c4b8956781746eb829792be8c9d03015ca3f010d Mon Sep 17 00:00:00 2001 From: Lars Wirzenius Date: Mon, 22 Jan 2018 15:10:14 +0200 Subject: Change: do workspace archive + upload via pipe This avoids having a huge blob in memory. --- worker_manager | 61 +++++++++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 45 insertions(+), 16 deletions(-) (limited to 'worker_manager') diff --git a/worker_manager b/worker_manager index a9d61a5..da7b23d 100755 --- a/worker_manager +++ b/worker_manager @@ -331,7 +331,7 @@ class Runner: def __init__(self, post_output): self._post = post_output self._buffers = {'stdout': '', 'stderr': ''} - self._maxbuf = 2**10 + self._maxbuf = 1024 * 256 self._timeout = 1.0 def runcmd(self, argv, **kwargs): @@ -465,38 +465,67 @@ class WorkspaceArchiver(WorkerBase): def do(self, work): params = work.get('parameters', {}) + blob_id = params.get('systree_name', 'noname') step = work['step'] dirname = step['archive'] if dirname == 'workspace': dirname = self._workspace - blob_id = params.get('systree_name', 'noname') - tarball = self.archive(dirname) - self._api.upload_blob(blob_id, tarball) + runner = Runner(self._post) + + temp = self.archive(runner, dirname) + if isinstance(temp, int): + return temp - return 0 + result = self.upload(runner, temp, blob_id) + os.remove(temp) - def archive(self, dirname): + return result + + def archive(self, runner, dirname): logging.info('Archiving %s', dirname) + self.report(b'Archiving workspace\n') + fd, filename = tempfile.mkstemp() os.close(fd) - argv = ['sudo', 'tar', '-zcvf', filename, '-C', dirname, '.'] + + archive = ['sudo', 'tar', '-zcf', filename, '-C', dirname, '.'] + exit_code = runner.runcmd(archive) + if exit_code != 0: + os.remove(filename) + return exit_code + + return filename + + def upload(self, runner, filename, blob_id): + logging.info('Uploading %s', blob_id) + self.report(b'Uploading workspace\n') + + url = self._api.url('/blobs/{}'.format(blob_id)) + headers = self._api.get_auth_headers() + upload = ['curl', '-sk', '-T', filename] + for name, value in headers.items(): + header = '{}: {}'.format(name, value) + upload.extend(['-H', header]) + upload.append(url) + + exit_code = runner.runcmd(upload) + if exit_code == 0: + self.report(b'Uploaded tarball of workspace\n') + + return exit_code + + def execute_argv(self, *argvs, **kwargs): exit_code, _, _ = cliapp.runcmd_unchecked( - argv, - stdout_callback=self.report, - stderr_callback=self.report, - cwd=self._workspace, + *argvs, + **kwargs, ) - self.report(b'Created tarball of workspace') - if exit_code == 0: - with open(filename, 'rb') as f: - return f.read() - os.remove(filename) return exit_code def report(self, data): self._post('stdout', data.decode('UTF-8')) + return b'' def get_argv(self, work, params_text): assert False -- cgit v1.2.1