#!/usr/bin/python3 # Copyright 2017-2018 Lars Wirzenius # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . import base64 import json import logging import os import subprocess import tempfile import time import apifw import cliapp import Crypto.PublicKey.RSA import requests import urllib3 import ick2 urllib3.disable_warnings() logging.captureWarnings(False) class WorkerManager(cliapp.Application): def __init__(self, **kwargs): super().__init__(**kwargs) self._token = None self._token_until = None def add_settings(self): self.settings.string( ['controller'], 'base URL for the controller', metavar='URL', ) self.settings.string( ['name'], 'name of this worker', metavar='URL', ) self.settings.string( ['token-key'], 'get token signing private key from FILE', metavar='FILE', ) self.settings.string( ['token-key-pub'], 'this is not used', metavar='NOPE', ) self.settings.integer( ['sleep'], 'sleep for SECS seconds if there is no work currently', metavar='SECS', default=5, ) self.settings.string( ['workspace'], 'use DIR as the workspace where commands are run', metavar='DIR', default='/var/lib/ick/workspace', ) self.settings.string( ['systree'], 'use DIR as the system tree for containers', metavar='DIR', default='/var/lib/ick/systree', ) def process_args(self, args): self.settings.require('name') self.settings.require('controller') name = self.settings['name'] url = self.settings['controller'] tg = TokenGenerator() tg.set_key(self.settings['token-key']) api = ControllerAPI(name, url, tg) worker = Worker( name, api, self.settings['workspace'], self.settings['systree']) logging.info('Worker manager %s starts, controller is %s', name, url) api.register() while True: work = api.get_work() if work: worker.do_work(work) else: self.sleep_a_little() def sleep_a_little(self): secs = self.settings['sleep'] time.sleep(secs) class ControllerAPI: def __init__(self, name, url, token_generator): self._name = name self._url = url self._token_generator = token_generator self._httpapi = HttpApi() def register(self): logging.info('Registering worker %s to %s', self._name, self._url) url = self.url('/workers') headers = self.get_auth_headers() body = { 'worker': self._name, } self._httpapi.post(url, headers, body) def get_work(self): url = self.url('/work/{}'.format(self._name)) headers = self.get_auth_headers() work = self._httpapi.get(url, headers) if work: logging.info('Response: %r', work) return work def report_work(self, work): logging.info('POST %s', work) url = self.url('/work') headers = self.get_auth_headers() code = self._httpapi.post(url, headers, work) if code not in (200, 201): raise cliapp.AppException( 'Error posting data to controller: {}'.format(code)) def upload_blob(self, blob_id, blob): logging.info('Upload blob %s', blob_id) url = self.url('/blobs/{}'.format(blob_id)) headers = self.get_auth_headers() code = self._httpapi.put(url, headers, blob) def download_blob(self, blob_id): logging.info('Download blob %s', blob_id) url = self.url('/blobs/{}'.format(blob_id)) headers = self.get_auth_headers() return self._httpapi.get_blob(url, headers) def url(self, path): return '{}{}'.format(self._url, path) def get_auth_headers(self): token = self._token_generator.get_token() return { 'Authorization': 'Bearer {}'.format(token), } class HttpApi: def __init__(self): self._session = requests.Session() def post(self, url, headers, body): r = self._session.post(url, json=body, headers=headers, verify=False) if not r.ok: logging.warning('Error: POST %s returned %s', url, r.status_code) return r.status_code def put(self, url, headers, body): r = self._session.put(url, data=body, headers=headers, verify=False) if not r.ok: logging.warning('Error: PUT %s returned %s', url, r.status_code) return r.status_code def get(self, url, headers): r = self._session.get(url, headers=headers, verify=False) if not r.ok or not r.text: return None return r.json() def get_blob(self, url, headers): r = self._session.get(url, headers=headers, verify=False) if not r.ok: return None return r.content class TokenGenerator: max_age = 3600 # 1 hour sub = 'subject-uuid' iss = 'localhost' aud = 'localhost' scopes = ' '.join([ 'uapi_work_id_get', 'uapi_work_post', 'uapi_workers_post', 'uapi_blobs_id_get', 'uapi_blobs_id_put', ]) def __init__(self): self._token = None self._token_key = None self._token_until = None def is_valid(self, now): return ( self._token is not None and (self._token_until is None or now <= self._token_until) ) def set_token(self, token): self._token = token self._token_until = None assert self.is_valid(time.time()) def set_key(self, filename): key_text = self.cat(filename) self._token_key = Crypto.PublicKey.RSA.importKey(key_text) def cat(self, filename): with open(filename) as f: return f.read() def get_token(self): now = time.time() if not self.is_valid(now): self._token = self.create_token() self._token_until = now + self.max_age assert self.is_valid(now) return self._token def create_token(self): now = time.time() claims = { 'iss': self.iss, 'sub': self.sub, 'aud': self.aud, 'exp': now + self.max_age, 'scope': self.scopes, } token = apifw.create_token(claims, self._token_key) return token.decode('ascii') class Worker: def __init__(self, name, api, workspace, systree): self._name = name self._api = api self._workspace = workspace self._systree = systree def do_work(self, work): def post(stream_name, data): s = self.get_work_result(work) s[stream_name] = data self._api.report_work(s) step = work['step'] logging.info('Running step: %r', step) exit_code = 0 if exit_code == 0: klass = self.worker_factory(step) if klass is None: exit_code = -1 else: worker = klass(self._api, self._workspace, self._systree, post) exit_code = worker.do(work) self.finish_work(work, exit_code) def get_work_result(self, work): return { 'build_id': work['build_id'], 'worker': self._name, 'project': work['project'], 'pipeline': work['pipeline'], 'stdout': '', 'stderr': '', 'exit_code': None, 'timestamp': self.now(), } def now(self): return time.strftime('%Y-%m-%dT%H:%M:%S') def worker_factory(self, step): table = [ ('shell', None, ShellWorker), ('python', None, PythonWorker), ('debootstrap', None, DebootstrapWorker), ('archive', None, WorkspaceArchiver), ('action', 'populate_systree', SystreePopulator), ('action', 'create_workspace', WorkspaceCleaner), ] for key, value, klass in table: if key in step and (value is None or step[key] == value): return klass logging.warning('Cannot find worker for %s', step) return None def finish_work(self, work, exit_code): s = self.get_work_result(work) s['exit_code'] = exit_code self._api.report_work(s) class Runner: def __init__(self, post_output): self._post = post_output self._buffers = {'stdout': '', 'stderr': ''} self._maxbuf = 1024 * 256 self._timeout = 1.0 def runcmd(self, argv, **kwargs): logging.debug('Runner.runcmd: argv=%r %r', argv, kwargs) exit_code, _, _ = cliapp.runcmd_unchecked( argv, stdout_callback=self.capture_stdout, stderr=subprocess.STDOUT, output_timeout=self._timeout, timeout_callback=self.flush, **kwargs ) self.flush() logging.debug('Runner.runcmd: finished, exit_code=%r', exit_code) return exit_code def capture_stdout(self, data): return self.capture('stdout', data) def capture_stderr(self, data): return self.capture('stderr', data) def capture(self, stream_name, data): logging.debug('CAPTURE %s: %r', stream_name, data) self._buffers[stream_name] += data.decode('UTF-8') if len(self._buffers[stream_name]) >= self._maxbuf: self.flush() return b'' def flush(self): logging.debug('flushing: self._buffers=%r', self._buffers) streams = list(self._buffers.keys()) logging.debug('streams: %r', streams) for stream in streams: buf = self._buffers[stream] logging.debug('FLUSH: %s: %r', stream, buf) self._buffers[stream] = '' if buf: logging.debug('Posting %d bytes of %s', len(buf), stream) self._post(stream, buf) class Mounter: def __init__(self, mounts, runner): self._mounts = mounts self._runner = runner def __enter__(self): self.mount() return self def __exit__(self, *args): self.unmount() def mount(self): for dirname, mp in self._mounts: if not os.path.exists(mp): os.mkdir(mp) self._runner.runcmd(['sudo', 'mount', '--bind', dirname, mp]) def unmount(self): for dirname, mp in reversed(self._mounts): self._runner.runcmd(['sudo', 'umount', mp]) class WorkerBase: def __init__(self, api, workspace, systree, post): self._api = api self._workspace = workspace self._systree = systree self._post = post def do(self, work): params = work.get('parameters', {}) params_text = self.params64(params) argv = self.get_argv(work, params_text) mounts = [] env = dict(os.environ) env.update({ 'LC_ALL': 'C', 'DEBIAN_FRONTEND': 'noninteractive', }) if self.where(work) == 'chroot': logging.debug('CHROOT REQUESTED') argv = ['sudo', 'chroot', self._workspace] + argv mounts = [ ('/proc', os.path.join(self._workspace, 'proc')), ('/sys', os.path.join(self._workspace, 'sys')), ] elif self.where(work) == 'container': logging.debug('CONTAINER REQUESTED') bind = '{}:/workspace'.format(self._workspace) mp = '{}/workspace'.format(self._workspace) if not os.path.exists(mp): os.mkdir(mp) argv = [ 'sudo', 'systemd-nspawn', '-D', self._systree, '--bind', bind, '--chdir', '/workspace', ] + argv else: logging.debug('HOST REQUESTED') runner = Runner(self._post) with Mounter(mounts, runner): return runner.runcmd(argv, cwd=self._workspace, env=env) def params64(self, params): as_json = json.dumps(params) as_bytes = as_json.encode('UTF-8') as_base64 = base64.b64encode(as_bytes) return as_base64.decode('UTF-8') def where(self, work): step = work.get('step', {}) return step.get('where') def get_argv(self, work, params_text): raise NotImplementedError() class ShellWorker(WorkerBase): def get_argv(self, work, params_text): step = work['step'] code_snippet = step['shell'] where = step.get('where', 'host') prefix = 'params() { echo -n "%s" | base64 -d; }\n' % params_text return ['bash', '-exuc', prefix + code_snippet] class PythonWorker(WorkerBase): def get_argv(self, work, params_text): prefix = ( 'import base64, json\n' 'params = json.loads(base64.b64decode(\n' ' "{}").decode("utf8"))\n' ).format(params_text) code_snippet = work['step']['python'] return ['python3', '-c', prefix + '\n' + code_snippet] class DebootstrapWorker(WorkerBase): def get_argv(self, work, params_text): step = work['step'] params = work.get('parameters', {}) if step['debootstrap'] == 'auto': suite = params['debian_codename'] else: suite = step['debootstrap'] return [ 'sudo', 'debootstrap', suite, '.', step.get('mirror', 'http://deb.debian.org/debian'), ] class WorkspaceCleaner(WorkerBase): def get_argv(self, work, params_text): return ['sudo', 'find', '.', '-delete'] class WorkspaceArchiver(WorkerBase): def do(self, work): params = work.get('parameters', {}) blob_id = params.get('systree_name', 'noname') step = work['step'] dirname = step['archive'] if dirname == 'workspace': dirname = self._workspace runner = Runner(self._post) temp = self.archive(runner, dirname) if isinstance(temp, int): return temp result = self.upload(runner, temp, blob_id) os.remove(temp) return result def archive(self, runner, dirname): logging.info('Archiving %s', dirname) self.report(b'Archiving workspace\n') fd, filename = tempfile.mkstemp() os.close(fd) archive = ['sudo', 'tar', '-zcf', filename, '-C', dirname, '.'] exit_code = runner.runcmd(archive) if exit_code != 0: os.remove(filename) return exit_code return filename def upload(self, runner, filename, blob_id): logging.info('Uploading %s', blob_id) self.report(b'Uploading workspace\n') url = self._api.url('/blobs/{}'.format(blob_id)) headers = self._api.get_auth_headers() upload = ['curl', '-sk', '-T', filename] for name, value in headers.items(): header = '{}: {}'.format(name, value) upload.extend(['-H', header]) upload.append(url) exit_code = runner.runcmd(upload) if exit_code == 0: self.report(b'Uploaded tarball of workspace\n') return exit_code def execute_argv(self, *argvs, **kwargs): exit_code, _, _ = cliapp.runcmd_unchecked( *argvs, **kwargs, ) return exit_code def report(self, data): self._post('stdout', data.decode('UTF-8')) return b'' def get_argv(self, work, params_text): assert False class SystreePopulator(WorkerBase): systree_dir = '/var/lib/ick/systree' def do(self, work): step = work['step'] systree_name = step.get('systree_name') if not systree_name: self.report( b'No systree_name field in action, no systree population\n') return 1 self.make_directory_empty(self.systree_dir) self.report(b'Downloading and unpacking systree blob\n') return self.download_and_unpack_systree(systree_name, self.systree_dir) def make_directory_empty(self, dirname): return self.execute_argv(['sudo', 'find', '-delete'], cwd=dirname) def download_and_unpack_systree(self, systree_name, dirname): url = self._api.url('/blobs/{}'.format(systree_name)) headers = self._api.get_auth_headers() download = ['curl', '-k'] for name, value in headers.items(): header = '{}: {}'.format(name, value) download.extend(['-H', header]) download.append(url) unpack = ['sudo', 'tar', '-zxf', '-', '-C', dirname] return self.execute_argv(download, unpack) def execute_argv(self, *argvs, **kwargs): exit_code, _, _ = cliapp.runcmd_unchecked( *argvs, stdout_callback=self.report, stderr_callback=self.report, **kwargs, ) return exit_code def report(self, data): self._post('stdout', data.decode('UTF-8')) def get_argv(self, work, params_text): assert False if __name__ == '__main__': WorkerManager(version=ick2.__version__).run()