#!/usr/bin/python3 # Copyright 2017 Lars Wirzenius # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . import base64 import json import logging import time import apifw import cliapp import Crypto.PublicKey.RSA import requests import urllib3 import ick2 urllib3.disable_warnings() logging.captureWarnings(False) class WorkerManager(cliapp.Application): def __init__(self, **kwargs): super().__init__(**kwargs) self._token = None self._token_until = None def add_settings(self): self.settings.string( ['controller'], 'base URL for the controller', metavar='URL', ) self.settings.string( ['name'], 'name of this worker', metavar='URL', ) self.settings.string( ['token-key'], 'get token signing private key from FILE', metavar='FILE', ) self.settings.string( ['token-key-pub'], 'this is not used', metavar='NOPE', ) self.settings.integer( ['sleep'], 'sleep for SECS seconds if there is no work currently', metavar='SECS', default=5, ) self.settings.string( ['workspace'], 'use DIR as the workspace where commands are run', metavar='DIR', default='/var/lib/ick/workspace', ) def process_args(self, args): self.settings.require('name') self.settings.require('controller') name = self.settings['name'] url = self.settings['controller'] tg = TokenGenerator() tg.set_key(self.settings['token-key']) api = ContainerAPI(name, url, tg) worker = Worker(name, api, self.settings['workspace']) logging.info('Worker manager %s starts, controller is %s', name, url) api.register() while True: work = api.get_work() if work: worker.do_work(work) else: self.sleep_a_little() def sleep_a_little(self): secs = self.settings['sleep'] time.sleep(secs) class ContainerAPI: def __init__(self, name, url, token_generator): self._name = name self._url = url self._token_generator = token_generator def register(self): logging.info('Registering worker %s to %s', self._name, self._url) url = self.url('/workers') headers = self.get_auth_headers() body = { 'worker': self._name, } httpapi = HttpApi() httpapi.post(url, headers, body) def get_work(self): url = self.url('/work/{}'.format(self._name)) headers = self.get_auth_headers() httpapi = HttpApi() work = httpapi.get(url, headers) if work: logging.info('Response: %r', work) return work def report_work(self, work): logging.info('POST %s', work) url = self.url('/work') headers = self.get_auth_headers() httpapi = HttpApi() code = httpapi.post(url, headers, work) if code not in (200, 201): raise cliapp.AppException( 'Error posting data to controller: {}'.format(code)) def url(self, path): return '{}{}'.format(self._url, path) def get_auth_headers(self): token = self._token_generator.get_token() return { 'Authorization': 'Bearer {}'.format(token), } class HttpApi: def post(self, url, headers, body): r = requests.post(url, json=body, headers=headers, verify=False) if not r.ok: logging.warning('Error: POST %s returned %s', url, r.status_code) else: logging.info('Registered worker successfully') return r.status_code def get(self, url, headers): r = requests.get(url, headers=headers, verify=False) if not r.ok or not r.text: return None return r.json() class TokenGenerator: max_age = 3600 # 1 hour sub = 'subject-uuid' iss = 'localhost' aud = 'localhost' scopes = ' '.join([ 'uapi_work_id_get', 'uapi_work_post', 'uapi_workers_post', ]) def __init__(self): self._token = None self._token_key = None self._token_until = None def is_valid(self, now): return ( self._token is not None and (self._token_until is None or now <= self._token_until) ) def set_token(self, token): self._token = token self._token_until = None assert self.is_valid(time.time()) def set_key(self, filename): key_text = self.cat(filename) self._token_key = Crypto.PublicKey.RSA.importKey(key_text) def cat(self, filename): with open(filename) as f: return f.read() def get_token(self): now = time.time() if not self.is_valid(now): self._token = self.create_token() self._token_until = now + self.max_age assert self.is_valid(now) return self._token def create_token(self): now = time.time() claims = { 'iss': self.iss, 'sub': self.sub, 'aud': self.aud, 'exp': now + self.max_age, 'scope': self.scopes, } token = apifw.create_token(claims, self._token_key) return token.decode('ascii') class Worker: def __init__(self, name, api, workspace): self._name = name self._api = api self._workspace = workspace def do_work(self, work): def post(stream_name, data): s = self.get_work_result(work) s[stream_name] = data.decode('UTF-8') self._api.report_work(s) step = work['step'] logging.info('Running step: %r', step) exit_code = 0 if work.get('fresh_workspace'): logging.info('Make an empty workspace') cleaner = WorkspaceCleaner(self._workspace, post) exit_code = cleaner.do(work) if exit_code == 0: klass = self.worker_factory(step) if klass is None: exit_code = -1 else: worker = klass(self._workspace, post) exit_code = worker.do(work) self.finish_work(work, exit_code) def get_work_result(self, work): return { 'build_id': work['build_id'], 'worker': self._name, 'project': work['project'], 'pipeline': work['pipeline'], 'stdout': '', 'stderr': '', 'exit_code': None, 'timestamp': self.now(), } def now(self): return time.strftime('%Y-%m-%dT%H:%M:%S') def worker_factory(self, step): if 'shell' in step: return ShellWorker elif 'python' in step: return PythonWorker elif 'debootstrap' in step: return DebootstrapWorker return None def finish_work(self, work, exit_code): s = self.get_work_result(work) s['exit_code'] = exit_code self._api.report_work(s) class WorkerBase: def __init__(self, workspace, post): self._workspace = workspace self._post = post def do(self, work): params = work.get('parameters', {}) params_text = self.params64(params) argv = self.get_argv(work, params_text) logging.debug('running: %r', argv) exit_code, _, _ = cliapp.runcmd_unchecked( argv, stdout_callback=lambda data: self._post('stdout', data), stderr_callback=lambda data: self._post('stderr', data), cwd=self._workspace, ) return exit_code def params64(self, params): as_json = json.dumps(params) as_bytes = as_json.encode('UTF-8') as_base64 = base64.b64encode(as_bytes) return as_base64.decode('UTF-8') def get_argv(self, work, params_text): raise NotImplementedError() class ShellWorker(WorkerBase): def get_argv(self, work, params_text): code_snippet = work['step']['shell'] prefix = 'params() { echo -n "%s" | base64 -d; }\n' % params_text return ['bash', '-exuc', prefix + code_snippet] class PythonWorker(WorkerBase): def get_argv(self, work, params_text): prefix = ( 'import base64, json\n' 'params = json.loads(base64.b64decode(\n' ' "{}").decode("utf8"))\n' ).format(params_text) code_snippet = work['step']['python'] return ['python3', '-c', prefix + '\n' + code_snippet] class DebootstrapWorker(WorkerBase): def get_argv(self, work, params_text): step = work['step'] return [ 'sudo', 'debootstrap', step['debootstrap'], '.', step.get('mirror', 'NOMIRROR'), ] class WorkspaceCleaner(WorkerBase): def get_argv(self, work, params_text): return ['find', '.', '-delete'] if __name__ == '__main__': WorkerManager(version=ick2.__version__).run()