#!/usr/bin/python3 # Copyright 2017-2018 Lars Wirzenius # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . import base64 import json import logging import os import tempfile import time import apifw import cliapp import Crypto.PublicKey.RSA import requests import urllib3 import ick2 urllib3.disable_warnings() logging.captureWarnings(False) class WorkerManager(cliapp.Application): def __init__(self, **kwargs): super().__init__(**kwargs) self._token = None self._token_until = None def add_settings(self): self.settings.string( ['controller'], 'base URL for the controller', metavar='URL', ) self.settings.string( ['name'], 'name of this worker', metavar='URL', ) self.settings.string( ['token-key'], 'get token signing private key from FILE', metavar='FILE', ) self.settings.string( ['token-key-pub'], 'this is not used', metavar='NOPE', ) self.settings.integer( ['sleep'], 'sleep for SECS seconds if there is no work currently', metavar='SECS', default=5, ) self.settings.string( ['workspace'], 'use DIR as the workspace where commands are run', metavar='DIR', default='/var/lib/ick/workspace', ) self.settings.string( ['systree'], 'use DIR as the system tree for containers', metavar='DIR', default='/var/lib/ick/systree', ) def process_args(self, args): self.settings.require('name') self.settings.require('controller') name = self.settings['name'] url = self.settings['controller'] tg = TokenGenerator() tg.set_key(self.settings['token-key']) api = ControllerAPI(name, url, tg) worker = Worker( name, api, self.settings['workspace'], self.settings['systree']) logging.info('Worker manager %s starts, controller is %s', name, url) api.register() while True: work = api.get_work() if work: worker.do_work(work) else: self.sleep_a_little() def sleep_a_little(self): secs = self.settings['sleep'] time.sleep(secs) class ControllerAPI: def __init__(self, name, url, token_generator): self._name = name self._url = url self._token_generator = token_generator self._httpapi = HttpApi() def register(self): logging.info('Registering worker %s to %s', self._name, self._url) url = self.url('/workers') headers = self.get_auth_headers() body = { 'worker': self._name, } self._httpapi.post(url, headers, body) def get_work(self): url = self.url('/work/{}'.format(self._name)) headers = self.get_auth_headers() work = self._httpapi.get(url, headers) if work: logging.info('Response: %r', work) return work def report_work(self, work): logging.info('POST %s', work) url = self.url('/work') headers = self.get_auth_headers() code = self._httpapi.post(url, headers, work) if code not in (200, 201): raise cliapp.AppException( 'Error posting data to controller: {}'.format(code)) def upload_blob(self, blob_id, blob): logging.info('Upload blob %s', blob_id) url = self.url('/blobs/{}'.format(blob_id)) headers = self.get_auth_headers() code = self._httpapi.put(url, headers, blob) def download_blob(self, blob_id): logging.info('Download blob %s', blob_id) url = self.url('/blobs/{}'.format(blob_id)) headers = self.get_auth_headers() return self._httpapi.get_blob(url, headers) def url(self, path): return '{}{}'.format(self._url, path) def get_auth_headers(self): token = self._token_generator.get_token() return { 'Authorization': 'Bearer {}'.format(token), } class HttpApi: def __init__(self): self._session = requests.Session() def post(self, url, headers, body): r = self._session.post(url, json=body, headers=headers, verify=False) if not r.ok: logging.warning('Error: POST %s returned %s', url, r.status_code) return r.status_code def put(self, url, headers, body): r = self._session.put(url, data=body, headers=headers, verify=False) if not r.ok: logging.warning('Error: PUT %s returned %s', url, r.status_code) return r.status_code def get(self, url, headers): r = self._session.get(url, headers=headers, verify=False) if not r.ok or not r.text: return None return r.json() def get_blob(self, url, headers): r = self._session.get(url, headers=headers, verify=False) if not r.ok: return None return r.content class TokenGenerator: max_age = 3600 # 1 hour sub = 'subject-uuid' iss = 'localhost' aud = 'localhost' scopes = ' '.join([ 'uapi_work_id_get', 'uapi_work_post', 'uapi_workers_post', 'uapi_blobs_id_get', 'uapi_blobs_id_put', ]) def __init__(self): self._token = None self._token_key = None self._token_until = None def is_valid(self, now): return ( self._token is not None and (self._token_until is None or now <= self._token_until) ) def set_token(self, token): self._token = token self._token_until = None assert self.is_valid(time.time()) def set_key(self, filename): key_text = self.cat(filename) self._token_key = Crypto.PublicKey.RSA.importKey(key_text) def cat(self, filename): with open(filename) as f: return f.read() def get_token(self): now = time.time() if not self.is_valid(now): self._token = self.create_token() self._token_until = now + self.max_age assert self.is_valid(now) return self._token def create_token(self): now = time.time() claims = { 'iss': self.iss, 'sub': self.sub, 'aud': self.aud, 'exp': now + self.max_age, 'scope': self.scopes, } token = apifw.create_token(claims, self._token_key) return token.decode('ascii') class Worker: def __init__(self, name, api, workspace, systree): self._name = name self._api = api self._workspace = workspace self._systree = systree def do_work(self, work): def post(stream_name, data): s = self.get_work_result(work) s[stream_name] = data self._api.report_work(s) step = work['step'] logging.info('Running step: %r', step) exit_code = 0 if exit_code == 0: klass = self.worker_factory(step) if klass is None: exit_code = -1 else: worker = klass(self._api, self._workspace, self._systree, post) exit_code = worker.do(work) self.finish_work(work, exit_code) def get_work_result(self, work): return { 'build_id': work['build_id'], 'worker': self._name, 'project': work['project'], 'pipeline': work['pipeline'], 'stdout': '', 'stderr': '', 'exit_code': None, 'timestamp': self.now(), } def now(self): return time.strftime('%Y-%m-%dT%H:%M:%S') def worker_factory(self, step): table = [ ('shell', None, ShellWorker), ('python', None, PythonWorker), ('debootstrap', None, DebootstrapWorker), ('archive', None, WorkspaceArchiver), ('action', 'populate_systree', SystreePopulator), ('action', 'create_workspace', WorkspaceCleaner), ] for key, value, klass in table: if key in step and (value is None or step[key] == value): return klass logging.warning('Cannot find worker for %s', step) return None def finish_work(self, work, exit_code): s = self.get_work_result(work) s['exit_code'] = exit_code self._api.report_work(s) class Runner: def __init__(self, post_output): self._post = post_output self._buffers = {'stdout': '', 'stderr': ''} self._maxbuf = 2**10 self._timeout = 1.0 def runcmd(self, argv, **kwargs): logging.debug('Runner.runcmd: argv=%r %r', argv, kwargs) exit_code, _, _ = cliapp.runcmd_unchecked( argv, stdout_callback=self.capture_stdout, stderr_callback=self.capture_stderr, timeout_callback=self.flush, **kwargs ) self.flush() logging.debug('Runner.runcmd: finished, exit_code=%r', exit_code) return exit_code def capture_stdout(self, data): return self.capture('stdout', data) def capture_stderr(self, data): return self.capture('stderr', data) def capture(self, stream_name, data): self._buffers[stream_name] += data.decode('UTF-8') if len(self._buffers[stream_name]) >= self._maxbuf: self.flush() return None def flush(self): for stream in self._buffers: buf = self._buffers[stream] self._buffers[stream] = '' if buf: logging.debug('Posting %d bytes of %s', len(buf), stream) return self._post(stream, buf) class WorkerBase: def __init__(self, api, workspace, systree, post): self._api = api self._workspace = workspace self._systree = systree self._post = post def do(self, work): params = work.get('parameters', {}) params_text = self.params64(params) argv = self.get_argv(work, params_text) if self.where(work) == 'chroot': logging.debug('CHROOT REQUESTED') argv = ['sudo', 'chroot', self._workspace] + argv elif self.where(work) == 'container': logging.debug('CONTAINER REQUESTED') argv = [ 'sudo', 'systemd-nspawn', '-D', self._systree, '--bind', self._workspace, ] + argv else: logging.debug('HOST REQUESTED') runner = Runner(self._post) return runner.runcmd(argv, cwd=self._workspace) def report_stdout(self, data): return self._post('stdout', data.decode('UTF-8')) def report_stderr(self, data): return self._post('stderr', data.decode('UTF-8')) def params64(self, params): as_json = json.dumps(params) as_bytes = as_json.encode('UTF-8') as_base64 = base64.b64encode(as_bytes) return as_base64.decode('UTF-8') def where(self, work): step = work.get('step', {}) return step.get('where') def get_argv(self, work, params_text): raise NotImplementedError() class ShellWorker(WorkerBase): def get_argv(self, work, params_text): step = work['step'] code_snippet = step['shell'] where = step.get('where', 'host') prefix = 'params() { echo -n "%s" | base64 -d; }\n' % params_text return ['bash', '-exuc', prefix + code_snippet] class PythonWorker(WorkerBase): def get_argv(self, work, params_text): prefix = ( 'import base64, json\n' 'params = json.loads(base64.b64decode(\n' ' "{}").decode("utf8"))\n' ).format(params_text) code_snippet = work['step']['python'] return ['python3', '-c', prefix + '\n' + code_snippet] class DebootstrapWorker(WorkerBase): def get_argv(self, work, params_text): step = work['step'] return [ 'sudo', 'debootstrap', step['debootstrap'], '.', step.get('mirror', 'NOMIRROR'), ] class WorkspaceCleaner(WorkerBase): def get_argv(self, work, params_text): return ['sudo', 'find', '.', '-delete'] class WorkspaceArchiver(WorkerBase): def do(self, work): params = work.get('parameters', {}) step = work['step'] dirname = step['archive'] if dirname == 'workspace': dirname = self._workspace blob_id = params.get('systree_name', 'noname') tarball = self.archive(dirname) self._api.upload_blob(blob_id, tarball) return 0 def archive(self, dirname): logging.info('Archiving %s', dirname) fd, filename = tempfile.mkstemp() os.close(fd) argv = ['sudo', 'tar', '-zcvf', filename, '-C', dirname, '.'] exit_code, _, _ = cliapp.runcmd_unchecked( argv, stdout_callback=self.report, stderr_callback=self.report, cwd=self._workspace, ) self.report(b'Created tarball of workspace') if exit_code == 0: with open(filename, 'rb') as f: return f.read() os.remove(filename) return exit_code def report(self, data): self._post('stdout', data.decode('UTF-8')) def get_argv(self, work, params_text): assert False class SystreePopulator(WorkerBase): systree_dir = '/var/lib/ick/systree' def do(self, work): step = work['step'] systree_name = step.get('systree_name') if not systree_name: self.report( b'No systree_name field in action, no systree population\n') return 1 self.make_directory_empty(self.systree_dir) tarball = self._api.download_blob(systree_name) self.unpack_systree(tarball, self.systree_dir) self.report(b'Systree has been populated\n') return 0 def make_directory_empty(self, dirname): return self.execute_argv(['sudo', 'find', '-delete'], cwd=dirname) def unpack_systree(self, tarball, dirname): return self.execute_argv( ['sudo', 'tar', '-zxf', '-', '-C', dirname], feed_stdin=tarball, ) def execute_argv(self, argv, **kwargs): exit_code, _, _ = cliapp.runcmd_unchecked( argv, stdout_callback=self.report, stderr_callback=self.report, **kwargs, ) return exit_code def report(self, data): self._post('stdout', data.decode('UTF-8')) def get_argv(self, work, params_text): assert False if __name__ == '__main__': WorkerManager(version=ick2.__version__).run()