From fab718970dc04fdb2578506fe3c83aa20bdfea6e Mon Sep 17 00:00:00 2001 From: Lars Wirzenius Date: Sat, 30 Dec 2017 18:29:37 +0200 Subject: Add: built-in archive action to worker-manager --- worker_manager | 97 ++++++++++++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 81 insertions(+), 16 deletions(-) (limited to 'worker_manager') diff --git a/worker_manager b/worker_manager index 8642036..408e1b0 100755 --- a/worker_manager +++ b/worker_manager @@ -18,6 +18,8 @@ import base64 import json import logging +import os +import tempfile import time import apifw @@ -110,6 +112,7 @@ class ContainerAPI: self._name = name self._url = url self._token_generator = token_generator + self._httpapi = HttpApi() def register(self): logging.info('Registering worker %s to %s', self._name, self._url) @@ -118,14 +121,12 @@ class ContainerAPI: body = { 'worker': self._name, } - httpapi = HttpApi() - httpapi.post(url, headers, body) + self._httpapi.post(url, headers, body) def get_work(self): url = self.url('/work/{}'.format(self._name)) headers = self.get_auth_headers() - httpapi = HttpApi() - work = httpapi.get(url, headers) + work = self._httpapi.get(url, headers) if work: logging.info('Response: %r', work) return work @@ -134,12 +135,17 @@ class ContainerAPI: logging.info('POST %s', work) url = self.url('/work') headers = self.get_auth_headers() - httpapi = HttpApi() - code = httpapi.post(url, headers, work) + code = self._httpapi.post(url, headers, work) if code not in (200, 201): raise cliapp.AppException( 'Error posting data to controller: {}'.format(code)) + def upload_blob(self, blob_id, blob): + logging.info('Upload blob %s', blob_id) + url = self.url('/blobs/{}'.format(blob_id)) + headers = self.get_auth_headers() + code = self._httpapi.put(url, headers, blob) + def url(self, path): return '{}{}'.format(self._url, path) @@ -152,16 +158,23 @@ class ContainerAPI: class HttpApi: + def __init__(self): + self._session = requests.Session() + def post(self, url, headers, body): - r = requests.post(url, json=body, headers=headers, verify=False) + r = self._session.post(url, json=body, headers=headers, verify=False) if not r.ok: logging.warning('Error: POST %s returned %s', url, r.status_code) - else: - logging.info('Registered worker successfully') + return r.status_code + + def put(self, url, headers, body): + r = self._session.put(url, data=body, headers=headers, verify=False) + if not r.ok: + logging.warning('Error: PUT %s returned %s', url, r.status_code) return r.status_code def get(self, url, headers): - r = requests.get(url, headers=headers, verify=False) + r = self._session.get(url, headers=headers, verify=False) if not r.ok or not r.text: return None return r.json() @@ -177,6 +190,8 @@ class TokenGenerator: 'uapi_work_id_get', 'uapi_work_post', 'uapi_workers_post', + 'uapi_blobs_id_get', + 'uapi_blobs_id_put', ]) def __init__(self): @@ -236,7 +251,7 @@ class Worker: def post(stream_name, data): s = self.get_work_result(work) - s[stream_name] = data.decode('UTF-8') + s[stream_name] = data self._api.report_work(s) step = work['step'] @@ -244,14 +259,14 @@ class Worker: exit_code = 0 if work.get('fresh_workspace'): logging.info('Make an empty workspace') - cleaner = WorkspaceCleaner(self._workspace, post) + cleaner = WorkspaceCleaner(None, self._workspace, post) exit_code = cleaner.do(work) if exit_code == 0: klass = self.worker_factory(step) if klass is None: exit_code = -1 else: - worker = klass(self._workspace, post) + worker = klass(self._api, self._workspace, post) exit_code = worker.do(work) self.finish_work(work, exit_code) @@ -277,6 +292,9 @@ class Worker: return PythonWorker elif 'debootstrap' in step: return DebootstrapWorker + elif 'archive' in step: + return WorkspaceArchiver + logging.warning('Cannot find worker for %s', step) return None def finish_work(self, work, exit_code): @@ -287,7 +305,8 @@ class Worker: class WorkerBase: - def __init__(self, workspace, post): + def __init__(self, api, workspace, post): + self._api = api self._workspace = workspace self._post = post @@ -303,12 +322,18 @@ class WorkerBase: logging.debug('running: %r', argv) exit_code, _, _ = cliapp.runcmd_unchecked( argv, - stdout_callback=lambda data: self._post('stdout', data), - stderr_callback=lambda data: self._post('stderr', data), + stdout_callback=self.report_stdout, + stderr_callback=self.report_stderr, cwd=self._workspace, ) return exit_code + def report_stdout(self, data): + return self._post('stdout', data.decode('UTF-8')) + + def report_stderr(self, data): + return self._post('stderr', data.decode('UTF-8')) + def params64(self, params): as_json = json.dumps(params) as_bytes = as_json.encode('UTF-8') @@ -362,5 +387,45 @@ class WorkspaceCleaner(WorkerBase): return ['sudo', 'find', '.', '-delete'] +class WorkspaceArchiver(WorkerBase): + + def do(self, work): + step = work['step'] + dirname = step['archive'] + + if dirname == 'workspace': + dirname = self._workspace + + blob_id = step['archive_name'] + tarball = self.archive(dirname) + self._api.upload_blob(blob_id, tarball) + + return 0 + + def archive(self, dirname): + logging.info('Archiving %s', dirname) + fd, filename = tempfile.mkstemp() + os.close(fd) + argv = ['sudo', 'tar', '-zcvf', filename, '-C', dirname, '.'] + exit_code, _, _ = cliapp.runcmd_unchecked( + argv, + stdout_callback=self.report, + stderr_callback=self.report, + cwd=self._workspace, + ) + self.report(b'Created tarball of workspace') + if exit_code == 0: + with open(filename, 'rb') as f: + return f.read() + os.remove(filename) + return exit_code + + def report(self, data): + self._post('stdout', data.decode('UTF-8')) + + def get_argv(self, work, params_text): + assert False + + if __name__ == '__main__': WorkerManager(version=ick2.__version__).run() -- cgit v1.2.1