From 6582e63e1d4efec128bcf8c0d7103643abc8650d Mon Sep 17 00:00:00 2001 From: Lars Wirzenius Date: Fri, 30 Mar 2018 10:02:47 +0300 Subject: Change: worker_manager to use new classes in ick2 --- worker_manager | 465 ++++++--------------------------------------------------- 1 file changed, 44 insertions(+), 421 deletions(-) (limited to 'worker_manager') diff --git a/worker_manager b/worker_manager index ec8c19f..ad01fd7 100755 --- a/worker_manager +++ b/worker_manager @@ -15,12 +15,7 @@ # along with this program. If not, see . -import base64 -import json import logging -import os -import subprocess -import tempfile import time import apifw @@ -88,16 +83,25 @@ class WorkerManager(cliapp.Application): ) def process_args(self, args): + try: + self.main_program(args) + except BaseException as e: + logging.error(str(e), exc_info=True) + raise + + def main_program(self, args): self.settings.require('name') self.settings.require('controller') name = self.settings['name'] url = self.settings['controller'] + workspace = self.settings['workspace'] + systree = self.settings['systree'] + tg = TokenGenerator() tg.set_key(self.settings['token-key']) api = ControllerAPI(name, url, tg) - worker = Worker( - name, api, self.settings['workspace'], self.settings['systree']) + worker = Worker(name, api, workspace, systree) logging.info('Worker manager %s starts, controller is %s', name, url) @@ -117,94 +121,43 @@ class WorkerManager(cliapp.Application): class ControllerAPI: def __init__(self, name, url, token_generator): - self._name = name - self._url = url - self._blob_url = None self._token_generator = token_generator - self._httpapi = HttpApi() self._cc = ick2.ControllerClient() self._cc.set_client_name(name) self._cc.set_controller_url(url) + self._blobs = None + + def get_token(self): + return self._token_generator.get_token() def register(self): - self._cc.set_token(self._token_generator.get_token()) + self._cc.set_token(self.get_token()) self._cc.register() def get_work(self): - self._cc.set_token(self._token_generator.get_token()) + self._cc.set_token(self.get_token()) return self._cc.get_work() def report_work(self, work): - self._cc.set_token(self._token_generator.get_token()) + self._cc.set_token(self.get_token()) self._cc.report_work(work) - def url(self, path): - return '{}{}'.format(self._url, path) - - def get_auth_headers(self): - token = self._token_generator.get_token() - return { - 'Authorization': 'Bearer {}'.format(token), - } + def get_blob_upload_url(self, name): + blobs = self.get_blob_client() + return blobs.url(name) def upload_blob(self, blob_id, blob): - logging.info('Upload blob %s', blob_id) - url = self.bloburl(blob_id) - headers = self.get_auth_headers() - code = self._httpapi.put(url, headers, blob) + blobs = self.get_blob_client() + blobs.upload(blob_id, blob) def download_blob(self, blob_id): - logging.info('Download blob %s', blob_id) - url = self.bloburl(blob_id) - headers = self.get_auth_headers() - return self._httpapi.get_blob(url, headers) - - def bloburl(self, blob_id): - if self._blob_url is None: - self._blob_url = self.get_artifact_store_url() - if self._blob_url is not None: - return '{}/blobs/{}'.format(self._blob_url, blob_id) - logging.error('Do not artifact store URL') - return None - - def get_artifact_store_url(self): - url = self.url('/version') - headers = self.get_auth_headers() - version = self._httpapi.get(url, headers) - logging.info('Version: %r', version) - if version: - return version.get('artifact_store') - - -class HttpApi: - - def __init__(self): - self._httpapi = ick2.HttpAPI() - - def post(self, url, headers, body): - ret = self._request(self._httpapi.post, url, headers, body=body) - if ret is None: - return 400 - return 201 + blobs = self.get_blob_client() + return blobs.download(blob_id) - def put(self, url, headers, body): - ret = self._request(self._httpapi.put, url, headers, body=body) - if ret is None: - return 400 - return 200 - - def get(self, url, headers): - return self._request(self._httpapi.get_dict, url, headers) - - def get_blob(self, url, headers): - return self._request(self._httpapi.get_blob, url, headers) - - def _request(self, func, url, headers, **kwargs): - try: - return func(url, headers=headers, **kwargs) - except ick2.HttpError as e: - logging.warning('Error: %s returned %s', url, e) - return None + def get_blob_client(self): + if self._blobs is None: + self._blobs = self._cc.get_blob_client() + return self._blobs class TokenGenerator: @@ -277,353 +230,23 @@ class Worker: self._systree = systree def do_work(self, work): - - def post(stream_name, data): - s = self.get_work_result(work) - s[stream_name] = data - self._api.report_work(s) - - step = work['step'] - logging.info('Running step: %r', step) - exit_code = 0 - if exit_code == 0: - klass = self.worker_factory(step) - if klass is None: - exit_code = -1 - else: - worker = klass(self._api, self._workspace, self._systree, post) - exit_code = worker.do(work) - self.finish_work(work, exit_code) - - def get_work_result(self, work): - return { - 'build_id': work['build_id'], - 'worker': self._name, - 'project': work['project'], - 'pipeline': work['pipeline'], - 'stdout': '', - 'stderr': '', - 'exit_code': None, - 'timestamp': self.now(), - } - - def now(self): - return time.strftime('%Y-%m-%dT%H:%M:%S') - - def worker_factory(self, step): - table = [ - ('shell', None, ShellWorker), - ('python', None, PythonWorker), - ('debootstrap', None, DebootstrapWorker), - ('archive', None, WorkspaceArchiver), - ('action', 'populate_systree', SystreePopulator), - ('action', 'create_workspace', WorkspaceCleaner), - ] - - for key, value, klass in table: - if key in step and (value is None or step[key] == value): - return klass - - logging.warning('Cannot find worker for %s', step) - return None - - def finish_work(self, work, exit_code): - s = self.get_work_result(work) - s['exit_code'] = exit_code - self._api.report_work(s) - - -class Runner: - - def __init__(self, post_output): - self._post = post_output - self._buffers = {'stdout': '', 'stderr': ''} - self._maxbuf = 1024 * 256 - self._timeout = 1.0 - - def runcmd(self, argv, **kwargs): - logging.debug('Runner.runcmd: argv=%r %r', argv, kwargs) - exit_code, _, _ = cliapp.runcmd_unchecked( - argv, - stdout_callback=self.capture_stdout, - stderr=subprocess.STDOUT, - output_timeout=self._timeout, - timeout_callback=self.flush, - **kwargs - ) - self.flush() - logging.debug('Runner.runcmd: finished, exit_code=%r', exit_code) - return exit_code - - def capture_stdout(self, data): - return self.capture('stdout', data) - - def capture_stderr(self, data): - return self.capture('stderr', data) - - def capture(self, stream_name, data): - logging.debug('CAPTURE %s: %r', stream_name, data) - self._buffers[stream_name] += data.decode('UTF-8') - if len(self._buffers[stream_name]) >= self._maxbuf: - self.flush() - return b'' - - def flush(self): - logging.debug('flushing: self._buffers=%r', self._buffers) - streams = list(self._buffers.keys()) - logging.debug('streams: %r', streams) - for stream in streams: - buf = self._buffers[stream] - logging.debug('FLUSH: %s: %r', stream, buf) - self._buffers[stream] = '' - if buf: - logging.debug('Posting %d bytes of %s', len(buf), stream) - self._post(stream, buf) - - -class Mounter: - - def __init__(self, mounts, runner): - self._mounts = mounts - self._runner = runner - - def __enter__(self): - self.mount() - return self - - def __exit__(self, *args): - self.unmount() - - def mount(self): - for dirname, mp in self._mounts: - if not os.path.exists(mp): - os.mkdir(mp) - self._runner.runcmd(['sudo', 'mount', '--bind', dirname, mp]) - - def unmount(self): - for dirname, mp in reversed(self._mounts): - self._runner.runcmd(['sudo', 'umount', mp]) - - -class WorkerBase: - - def __init__(self, api, workspace, systree, post): - self._api = api - self._workspace = workspace - self._systree = systree - self._post = post - - def do(self, work): - params = work.get('parameters', {}) - params_text = self.params64(params) - argv = self.get_argv(work, params_text) - mounts = [] - env = dict(os.environ) - env.update({ - 'LC_ALL': 'C', - 'DEBIAN_FRONTEND': 'noninteractive', - }) - if self.where(work) == 'chroot': - logging.debug('CHROOT REQUESTED') - argv = ['sudo', 'chroot', self._workspace] + argv - mounts = [ - ('/proc', os.path.join(self._workspace, 'proc')), - ('/sys', os.path.join(self._workspace, 'sys')), - ] - elif self.where(work) == 'container': - logging.debug('CONTAINER REQUESTED') - bind = '{}:/workspace'.format(self._workspace) - mp = '{}/workspace'.format(self._workspace) - if not os.path.exists(mp): - os.mkdir(mp) - argv = [ - 'sudo', 'systemd-nspawn', '-D', self._systree, '--bind', bind, - '--chdir', '/workspace', - ] + argv - else: - logging.debug('HOST REQUESTED') - runner = Runner(self._post) - with Mounter(mounts, runner): - return runner.runcmd(argv, cwd=self._workspace, env=env) - - def params64(self, params): - as_json = json.dumps(params) - as_bytes = as_json.encode('UTF-8') - as_base64 = base64.b64encode(as_bytes) - return as_base64.decode('UTF-8') - - def where(self, work): + logging.debug('Doing work: %r', work) + project_name = work['project'] step = work.get('step', {}) - return step.get('where') - - def get_argv(self, work, params_text): - raise NotImplementedError() - - -class ShellWorker(WorkerBase): - - def get_argv(self, work, params_text): - step = work['step'] - code_snippet = step['shell'] - where = step.get('where', 'host') - prefix = 'params() { echo -n "%s" | base64 -d; }\n' % params_text - return ['bash', '-exuc', prefix + code_snippet] - - -class PythonWorker(WorkerBase): - - def get_argv(self, work, params_text): - prefix = ( - 'import base64, json\n' - 'params = json.loads(base64.b64decode(\n' - ' "{}").decode("utf8"))\n' - ).format(params_text) - code_snippet = work['step']['python'] - return ['python3', '-c', prefix + '\n' + code_snippet] - - -class DebootstrapWorker(WorkerBase): - - def get_argv(self, work, params_text): - step = work['step'] - params = work.get('parameters', {}) - if step['debootstrap'] == 'auto': - suite = params['debian_codename'] - else: - suite = step['debootstrap'] - - return [ - 'sudo', - 'debootstrap', - suite, - '.', - step.get('mirror', 'http://deb.debian.org/debian'), - ] - - -class WorkspaceCleaner(WorkerBase): - - def get_argv(self, work, params_text): - return ['sudo', 'find', '.', '-delete'] - - -class WorkspaceArchiver(WorkerBase): - - def do(self, work): + logging.debug('Doing work: step=%r', step) params = work.get('parameters', {}) - blob_id = params.get('systree_name', 'noname') - step = work['step'] - dirname = step['archive'] - - if dirname == 'workspace': - dirname = self._workspace - - runner = Runner(self._post) - - temp = self.archive(runner, dirname) - if isinstance(temp, int): - return temp - - result = self.upload(runner, temp, blob_id) - os.remove(temp) - - return result - - def archive(self, runner, dirname): - logging.info('Archiving %s', dirname) - self.report(b'Archiving workspace\n') - - fd, filename = tempfile.mkstemp() - os.close(fd) - - archive = ['sudo', 'tar', '-zcf', filename, '-C', dirname, '.'] - exit_code = runner.runcmd(archive) - if exit_code != 0: - os.remove(filename) - return exit_code - - return filename - - def upload(self, runner, filename, blob_id): - logging.info('Uploading %s', blob_id) - self.report(b'Uploading workspace\n') - - url = self._api.bloburl(blob_id) - headers = self._api.get_auth_headers() - upload = ['curl', '-sk', '-T', filename] - for name, value in headers.items(): - header = '{}: {}'.format(name, value) - upload.extend(['-H', header]) - upload.append(url) - - exit_code = runner.runcmd(upload) - if exit_code == 0: - self.report(b'Uploaded tarball of workspace\n') - - return exit_code - - def execute_argv(self, *argvs, **kwargs): - exit_code, _, _ = cliapp.runcmd_unchecked( - *argvs, - **kwargs, - ) - return exit_code - - def report(self, data): - self._post('stdout', data.decode('UTF-8')) - return b'' - - def get_argv(self, work, params_text): - assert False - - -class SystreePopulator(WorkerBase): - - systree_dir = '/var/lib/ick/systree' - - def do(self, work): - step = work['step'] - systree_name = step.get('systree_name') - if not systree_name: - self.report( - b'No systree_name field in action, no systree population\n') - return 1 - - self.make_directory_empty(self.systree_dir) - - self.report(b'Downloading and unpacking systree blob\n') - return self.download_and_unpack_systree(systree_name, self.systree_dir) - - def make_directory_empty(self, dirname): - return self.execute_argv(['sudo', 'find', '-delete'], cwd=dirname) - - def download_and_unpack_systree(self, systree_name, dirname): - url = self._api.bloburl(systree_name) - headers = self._api.get_auth_headers() - download = ['curl', '-k'] - for name, value in headers.items(): - header = '{}: {}'.format(name, value) - download.extend(['-H', header]) - download.append(url) - - unpack = ['sudo', 'tar', '-zxf', '-', '-C', dirname] - - return self.execute_argv(download, unpack) - - def execute_argv(self, *argvs, **kwargs): - exit_code, _, _ = cliapp.runcmd_unchecked( - *argvs, - stdout_callback=self.report, - stderr_callback=self.report, - **kwargs, - ) - return exit_code - - def report(self, data): - self._post('stdout', data.decode('UTF-8')) - - def get_argv(self, work, params_text): - assert False + logging.debug('Doing work: params=%r', params) + reporter = ick2.Reporter(self._api, work) + logging.debug('Doing work: reporter=%r', reporter) + af = ick2.ActionFactory(self._systree, self._workspace, reporter) + logging.debug('Doing work: af=%r', af) + af.set_token(self._api.get_token()) + af.set_blob_url_func(self._api.get_blob_upload_url) + action = af.create_action(step, project_name) + logging.debug('Doing work: action=%r', action) + exit_code = action.execute(params, step) + logging.debug('Action finished: exit_code=%r', exit_code) + logging.debug('Finished work: %r', work) if __name__ == '__main__': -- cgit v1.2.1