From 4fddaccccdd090fa834c0f7b272ce627ff155888 Mon Sep 17 00:00:00 2001 From: Lars Wirzenius Date: Wed, 27 Dec 2017 16:01:08 +0200 Subject: Refactor: worker_manager for cleanliness --- worker_manager | 413 ++++++++++++++++++++++++++++++++------------------------- 1 file changed, 230 insertions(+), 183 deletions(-) (limited to 'worker_manager') diff --git a/worker_manager b/worker_manager index 92c2ad4..24214bd 100755 --- a/worker_manager +++ b/worker_manager @@ -18,7 +18,6 @@ import base64 import json import logging -import sys import time import apifw @@ -26,7 +25,6 @@ import cliapp import Crypto.PublicKey.RSA import requests import urllib3 - import ick2 @@ -42,11 +40,6 @@ class WorkerManager(cliapp.Application): self._token_until = None def add_settings(self): - self.settings.boolean( - ['pretend'], - 'only pretend to execute shell steps', - ) - self.settings.string( ['controller'], 'base URL for the controller', @@ -59,12 +52,6 @@ class WorkerManager(cliapp.Application): metavar='URL', ) - self.settings.string( - ['token'], - 'use TOKEN for all controller HTTP requests', - metavar='TOKEN', - ) - self.settings.string( ['token-key'], 'get token signing private key from FILE', @@ -73,8 +60,8 @@ class WorkerManager(cliapp.Application): self.settings.string( ['token-key-pub'], - 'get token signing public key from FILE', - metavar='FILE', + 'this is not used', + metavar='NOPE', ) self.settings.integer( @@ -93,121 +80,185 @@ class WorkerManager(cliapp.Application): def process_args(self, args): self.settings.require('name') + self.settings.require('controller') + name = self.settings['name'] url = self.settings['controller'] - self.show_msg( - 'Worker manager {} starts, controller is {}'.format(name, url)) - self.register(url, name) + tg = TokenGenerator() + tg.set_key(self.settings['token-key']) + api = ContainerAPI(name, url, tg) + worker = Worker(name, api, self.settings['workspace']) + + logging.info('Worker manager %s starts, controller is %s', name, url) + + api.register() while True: - work = self.get_work(url, name) - if work and self.settings['pretend']: - self.report_pretend_work(url, name, work) - elif work: - self.do_work(url, name, work, self.settings['workspace']) + work = api.get_work() + if work: + worker.do_work(work) else: self.sleep_a_little() - def register(self, url, name): - logging.info('Registering worker %s to %s', name, url) - register_url = '{}/workers'.format(url) - headers = self.get_auth_headers() - body = { - 'worker': name, - } - r = requests.post( - register_url, json=body, headers=headers, verify=False) - if not r.ok: - logging.warning('Error registering worker: %d', r.status_code) - else: - logging.info('Registered worker successfully') - def sleep_a_little(self): secs = self.settings['sleep'] time.sleep(secs) - def get_work(self, url, name): - get_work_url = '{}/work/{}'.format(url, name) + +class ContainerAPI: + + def __init__(self, name, url, token_generator): + self._name = name + self._url = url + self._token_generator = token_generator + + def register(self): + logging.info('Registering worker %s to %s', self._name, self._url) + url = self.url('/workers') headers = self.get_auth_headers() - r = requests.get(get_work_url, headers=headers, verify=False) - if r.status_code != 200 or not r.text: - return None - work = r.json() + body = { + 'worker': self._name, + } + httpapi = HttpApi() + httpapi.post(url, headers, body) + + def get_work(self): + url = self.url('/work/{}'.format(self._name)) + headers = self.get_auth_headers() + httpapi = HttpApi() + work = httpapi.get(url, headers) if work: - self.show_json('Response:', work) + logging.info('Response: %r', work) return work + def report_work(self, work): + logging.info('POST %s', work) + url = self.url('/work') + headers = self.get_auth_headers() + httpapi = HttpApi() + code = httpapi.post(url, headers, work) + if code not in (200, 201): + raise cliapp.AppException( + 'Error posting data to controller: {}'.format(code)) + + def url(self, path): + return '{}{}'.format(self._url, path) + def get_auth_headers(self): - token = self.get_token() + token = self._token_generator.get_token() return { 'Authorization': 'Bearer {}'.format(token), } - def get_token(self): - token = self.settings['token'] - token_key = self.settings['token-key'] - token_key_pub = self.settings['token-key-pub'] - now = time.time() - if self._token is None or now >= self._token_until: - if token: - self._token = token - self._token_until = now + 3600 - elif token_key and token_key_pub: - with open(token_key) as f1, open(token_key_pub) as f2: - key_text = f1.read() - pub_text = f2.read() - self._token = self.create_token(key_text, pub_text) - self._token_until = now + 3600 - else: - sys.exit('No token and no way to create') +class HttpApi: - assert self._token is not None - return self._token + def post(self, url, headers, body): + r = requests.post(url, json=body, headers=headers, verify=False) + if not r.ok: + logging.warning('Error: POST %s returned %s', url, r.status_code) + else: + logging.info('Registered worker successfully') + return r.status_code - def create_token(self, key_text, pub_text): - iss = 'localhost' - aud = 'localhost' - scopes_list = [ - 'uapi_work_id_get', - 'uapi_work_post', - 'uapi_workers_post', - ] + def get(self, url, headers): + r = requests.get(url, headers=headers, verify=False) + if not r.ok or not r.text: + return None + return r.json() + + +class TokenGenerator: + + max_age = 3600 # 1 hour + sub = 'subject-uuid' + iss = 'localhost' + aud = 'localhost' + scopes = ' '.join([ + 'uapi_work_id_get', + 'uapi_work_post', + 'uapi_workers_post', + ]) + + def __init__(self): + self._token = None + self._token_key = None + self._token_until = None + + def is_valid(self, now): + return ( + self._token is not None and + (self._token_until is None or now <= self._token_until) + ) + + def set_token(self, token): + self._token = token + self._token_until = None + assert self.is_valid(time.time()) - key = Crypto.PublicKey.RSA.importKey(key_text) - scopes = ' '.join(scopes_list) + def set_key(self, filename): + key_text = self.cat(filename) + self._token_key = Crypto.PublicKey.RSA.importKey(key_text) + def cat(self, filename): + with open(filename) as f: + return f.read() + + def get_token(self): + now = time.time() + if not self.is_valid(now): + self._token = self.create_token() + self._token_until = now + self.max_age + assert self.is_valid(now) + return self._token + + def create_token(self): now = time.time() claims = { - 'iss': iss, - 'sub': 'subject-uuid', - 'aud': aud, - 'exp': now + 86400, - 'scope': scopes, + 'iss': self.iss, + 'sub': self.sub, + 'aud': self.aud, + 'exp': now + self.max_age, + 'scope': self.scopes, } - token = apifw.create_token(claims, key) + token = apifw.create_token(claims, self._token_key) return token.decode('ascii') - def report_pretend_work(self, url, name, work): - self.show_msg('Pretending to work: {!r}'.format(work)) - snippet_url = '{}/work/{}'.format(url, name) - snippet = { - 'build_id': work['build_id'], - 'worker': name, - 'project': work['project'], - 'stdout': '', - 'stderr': '', - 'exit_code': None, - 'timestamp': self.now(), - } - self.post_snippet(snippet_url, snippet) - def do_work(self, url, name, work, workspace): - self.show_msg('Doing work') - snippet_url = '{}/work'.format(url) - snippet = { +class Worker: + + def __init__(self, name, api, workspace): + self._name = name + self._api = api + self._workspace = workspace + + def do_work(self, work): + + def post(stream_name, data): + s = self.get_work_result(work) + s[stream_name] = data.decode('UTF-8') + self._api.report_work(s) + + step = work['step'] + logging.info('Running step: %r', step) + exit_code = 0 + if work.get('fresh_workspace'): + logging.info('Make an empty workspace') + cleaner = WorkspaceCleaner(self._workspace, post) + exit_code = cleaner.do(work) + if exit_code == 0: + klass = self.worker_factory(step) + if klass is None: + exit_code = -1 + else: + worker = klass(self._workspace, post) + exit_code = worker.do(work) + self.finish_work(work, exit_code) + + def get_work_result(self, work): + return { 'build_id': work['build_id'], - 'worker': name, + 'worker': self._name, 'project': work['project'], 'pipeline': work['pipeline'], 'stdout': '', @@ -216,95 +267,91 @@ class WorkerManager(cliapp.Application): 'timestamp': self.now(), } - def post(stream_name, data): - data = data.decode('UTF-8') - s = dict(snippet) - s[stream_name] = data - self.post_snippet(snippet_url, s) - - def run_with_interp(argv_prefix, cmd): - exit_code, _, _ = cliapp.runcmd_unchecked( - argv_prefix + [cmd], - stdout_callback=lambda data: post('stdout', data), - stderr_callback=lambda data: post('stderr', data), - cwd=workspace, - ) - return exit_code - - def run_shell(shell_cmd): - params = work.get('parameters', {}) - params_json = json.dumps(params) - params_text = base64.b64encode( - params_json.encode('utf8')).decode('utf8') - prefix = [ - 'params() { echo -n "%s" | base64 -d; }' % params_text, - ] - prefix_text = ''.join('{}\n'.format(line) for line in prefix) - script = prefix_text + shell_cmd - return run_with_interp(['bash', '-xeuc'], script) - - def run_python(python_cmd): - params = work.get('parameters', {}) - params_json = json.dumps(params) - params_text = base64.b64encode( - params_json.encode('utf8')).decode('utf8') - prefix = [ - 'import base64, json', - 'params = json.loads(base64.b64decode(' - ' "{}").decode("utf8"))'.format(params_text) - ] - prefix_text = ''.join('{}\n'.format(line) for line in prefix) - script = prefix_text + python_cmd - return run_with_interp(['python3', '-c'], script) - - def run_debootstrap(step): - shell_cmd = "sudo debootstrap '{}' . '{}'".format( - step['debootstrap'], step.get('mirror', 'NOMIRROR')) - run_shell(shell_cmd) - - def run_cmd(step): - logging.info('Running step: %r', step) - if 'shell' in step: - run_shell(step['shell']) - elif 'python' in step: - run_python(step['python']) - elif 'debootstrap' in step: - run_debootstrap(step) - - exit_code = 0 - if work.get('fresh_workspace'): - logging.info('Make an empty workspace') - exit_code = run_shell('find . -delete') - if exit_code == 0: - run_cmd(work['step']) - end_snippet = dict(snippet) - end_snippet['exit_code'] = exit_code - self.post_snippet(snippet_url, end_snippet) - def now(self): return time.strftime('%Y-%m-%dT%H:%M:%S') - def post_snippet(self, url, snippet): - headers = { - 'Content-Type': 'application/json', - } - headers.update(self.get_auth_headers()) - self.show_json('POST {}'.format(url), snippet) - r = requests.post( - url, headers=headers, data=json.dumps(snippet), verify=False) - if not r.ok: - raise cliapp.AppException( - 'Error posting data to controller: {} {!r}'.format( - r.status_code, r.text)) + def worker_factory(self, step): + if 'shell' in step: + return ShellWorker + elif 'python' in step: + return PythonWorker + elif 'debootstrap' in step: + return DebootstrapWorker + return None + + def finish_work(self, work, exit_code): + s = self.get_work_result(work) + s['exit_code'] = exit_code + self._api.report_work(s) + + +class WorkerBase: + + def __init__(self, workspace, post): + self._workspace = workspace + self._post = post + + def do(self, work): + params = work.get('parameters', {}) + params_text = self.params64(params) + argv = self.get_argv(work, params_text) + logging.debug('running: %r', argv) + exit_code, _, _ = cliapp.runcmd_unchecked( + argv, + stdout_callback=lambda data: self._post('stdout', data), + stderr_callback=lambda data: self._post('stderr', data), + cwd=self._workspace, + ) + return exit_code + + def params64(self, params): + as_json = json.dumps(params) + as_bytes = as_json.encode('UTF-8') + as_base64 = base64.b64encode(as_bytes) + return as_base64.decode('UTF-8') + + def get_argv(self, work, params_text): + raise NotImplementedError() + + +class ShellWorker(WorkerBase): + + def get_argv(self, work, params_text): + code_snippet = work['step']['shell'] + prefix = 'params() { echo -n "%s" | base64 -d; }\n' % params_text + return ['bash', '-exuc', prefix + code_snippet] + + +class PythonWorker(WorkerBase): + + def get_argv(self, work, params_text): + prefix = ( + 'import base64, json\n' + 'params = json.loads(base64.b64decode(\n' + ' "{}").decode("utf8"))\n' + ).format(params_text) + code_snippet = work['step']['python'] + return ['python3', '-c', prefix + '\n' + code_snippet] + + +class DebootstrapWorker(WorkerBase): + + def get_argv(self, work, params_text): + step = work['step'] + return [ + 'sudo', + 'debootstrap', + step['debootstrap'], + '.', + step.get('mirror', 'NOMIRROR'), + ] + - def show_msg(self, msg): - sys.stdout.write('------\n{}\n'.format(msg)) - logging.info(msg) +class WorkspaceCleaner(WorkerBase): - def show_json(self, prelude, obj): - msg = '{}: {}'.format(prelude, json.dumps(obj, indent=4)) - sys.stdout.write('------\n{}\n'.format(msg)) - logging.info(msg) + def get_argv(self, work, params_text): + return ['find', '.', '-delete'] -WorkerManager(version=ick2.__version__).run() +if __name__ == '__main__': + WorkerManager(version=ick2.__version__).run() -- cgit v1.2.1