#!/usr/bin/env python3 # # This implements the various APIs for the various CI components we # need. Very simplistic and prototype-y. All in one file for # simplicity - production version will need to be done more carefully. # # To use: run this program with one of the following command lines: # # ./this controller this.log key.pub # ./this vcsworker this.log key.pub gitlab.token # # where # # - this.log is the log file to use # - key.pub is a public key in ssh format for checking incoming access tokens # - gitlab.token is the name of a file containing a GitLab access token # # NOTE: The GitLab instance and other configuration details are # hardcoded in this version. This will be fixed, as well as the 418 # HTTP status code as a generic error code. # # NOTE: You should run this behind haproxy or smilar TLS provider, # which forwards requests to localhost. import json import logging import os import shutil import subprocess import sys import tempfile import time import urllib.parse import Crypto.PublicKey.RSA import bottle import jwt HOST = '127.0.0.1' PORT = 2222 class TokenParser: '''Parse an incoming access token (signed JWT)''' # Note that if we need to, for performance, we can cache the parse # results here. But there's no point in doing that unless it # becomes necessary. def __init__(self, pubkey): self._pubkey = pubkey def parse_token(self, token_text): return jwt.decode( token_text, key=self._pubkey.exportKey('OpenSSH'), audience=None, options={'verify_aud': False}) class AccessChecker: '''Given request headers and required scopes, is a request allowed?''' def __init__(self, pubkey): self._parser = TokenParser(pubkey) def access_is_allowed(self, headers, required_scopes): token = self._get_token(headers) logging.debug('Access token %r', token) if token is None and len(required_scopes) != 0: logging.error('No valid access token') return False if token: scopes = token.get('scope', '').split() missing = set(required_scopes).difference(scopes) if missing: logging.error( 'Required scopes that are missing from token: %r', missing) return False return True def _get_token(self, headers): '''Parse an access token or return None if it's bad''' token_text = self._get_token_text(headers) if token_text is None: return None return self._parser.parse_token(token_text) def _get_token_text(self, headers): '''Extract access token from request headers or None if not there''' v = headers.get('Authorization', '') if not v: logging.error('No Authorization header') return None words = v.split() if len(words) != 2: logging.error('Authorization header does not contain two words') return None keyword, token_text = words if keyword.lower() != 'bearer': logging.error('Authorization header does not contain a Bearer token') return None logging.debug( 'Got an access token from Authorization header: %s', token_text) return token_text class AccessCheckerPlugin: def __init__(self, checker): self._checker = checker def apply(self, callback, route): def access_checker_wrapper(*args, **kwargs): r = bottle.request try: logging.debug('AccessCheckerPlugin: checking if access is allowed') scopes = route['config']['scopes'] authz_needed = scopes is not None logging.debug('authz_needed: %r', authz_needed) if authz_needed: logging.debug('checking if access is allowed: %r', scopes) if not self._checker.access_is_allowed(r.headers, scopes): logging.error('Request denied %s %s', r.method, r.path) return bottle.HTTPError(400) logging.debug( 'AccessCheckerPlugin: access is allowed, ' 'calling callback') return callback(*args, **kwargs) except BaseException as e: logging.error( 'Could not handle request: %s %s: %s', r.method, r.path, str(e)) return bottle.HTTPError(500) return access_checker_wrapper class API: '''Base class for simple HTTP APIs Override the get_routes method to use this. Call setup() method to set up routes and such, before actually running. ''' def __init__(self): self._checker = None self.app = None def setup(self, app, token_pubkey): checker = AccessChecker(token_pubkey) self._plugin = AccessCheckerPlugin(checker) self._add_routes(app, self.get_routes()) def get_routes(self): raise NotImplementedError() def _add_routes(self, app, routes): for r in routes: assert isinstance(r['scopes'], list) or r['scopes'] is None route = { 'method': r['method'], 'path': r['path'], 'callback': r['func'], 'apply': self._plugin, 'scopes': r['scopes'], } app.route(**route) class VCSWorker(API): '''A VCSWorker API''' MAX_CLONE_TIME = 1 MAX_REMOVE_TIME = 60 MAX_CREATE_REPO_TIME = 5 MAX_SET_WEBHOOK_TIME = 60 MAX_PUSH_TIME = 60 GITLAB_DOMAIN = 'wmf-gitlab3.vm.liw.fi' GITLAB_PROJECT = 'liw' WEBHOOK_URL = 'https://wmf2-controller.vm.liw.fi/webhook' def __init__(self, gitlab_token, artifact_token): self._gitlab_token = gitlab_token self._artifact_token = artifact_token self._tmpdir = tempfile.mkdtemp() logging.info('Workspace: %s', self._tmpdir) def get_routes(self): return [ { 'method': 'POST', 'path': '/updaterepo', 'func': self._update_repo, 'scopes': ['update-repo'], }, ] def _update_repo(self): spec = bottle.request.json logging.info('Updating repository: %r', spec) url = spec['git'] ref = spec['ref'] name = spec['gitlab'] dirname = os.path.join(self._tmpdir, name) T = self._gitlab_token D = self.GITLAB_DOMAIN P = self.GITLAB_PROJECT key = 'ARTIFACT_TOKEN' value = self._artifact_token def update(): return (self._clone(url, ref, dirname) and self._remove(T, D, P, name) and self._create_repo(T, D, name) and self._list_vars(T, D, P, name) and self._set_var(T, D, P, name, key, value) and self._list_vars(T, D, P, name) and self._add_job_webhook(T, D, P, name) and self._push(dirname, D, P, ref, name)) if update(): return 'Repository copied successfully\n' logging.error('Something went wrong when copying repository') return bottle.HTTPError(418) def _clone(self, url, ref, dirname): if os.path.exists(dirname): logging.debug('Removing %s', dirname) shutil.rmtree(dirname) argv = ['git', 'clone', '-q', '-b', ref, url, dirname] return runcmd('.', argv, self.MAX_CLONE_TIME) def _remove(self, token, gitlab_domain, gitlab_project, name): snippet = urllib.parse.quote('%s/%s' % (gitlab_project, name), safe='') url = 'https://%s/api/v4/projects/%s' % (gitlab_domain, snippet) argv = ['curl', '-HPRIVATE-TOKEN: %s' % token, '-X', 'DELETE', url] result = runcmd('.', argv, self.MAX_REMOVE_TIME) time.sleep(5) return result def _create_repo(self, token, gitlab_domain, name): logging.info('Creating repository %s', name) url = 'https://%s/api/v4/projects' % gitlab_domain argv = [ 'curl', url, '-sv', '-X' 'POST', '-d', 'name=%s' % name, '-HPRIVATE-TOKEN: %s' % token, ] return runcmd('.', argv, self.MAX_CREATE_REPO_TIME) def _set_var(self, token, gitlab_domain, gitlab_project, name, key, value): logging.info( 'Setting variable for %s/%s: %s=%s', gitlab_project, name, key, value) name = urllib.parse.quote('%s/%s' % (gitlab_project, name), safe='') url = 'https://%s/api/v4/projects/%s/variables' % (gitlab_domain, name) argv = [ 'curl', url, '-sv', '-X' 'POST', '-d', 'key=%s' % key, '-d', 'value=%s' % value, '-HPRIVATE-TOKEN: %s' % token, ] return runcmd('.', argv, self.MAX_CREATE_REPO_TIME) def _list_vars(self, token, gitlab_domain, gitlab_project, name): logging.info('Getting variables for %s/%s', gitlab_project, name) name = urllib.parse.quote('%s/%s' % (gitlab_project, name), safe='') url = 'https://%s/api/v4/projects/%s/variables' % (gitlab_domain, name) argv = ['curl', url, '-sv', '-HPRIVATE-TOKEN: %s' % token] output = runcmd('.', argv, self.MAX_CREATE_REPO_TIME) logging.info('Variables: %r', output) print('Variables', output) return True def _set_var(self, token, gitlab_domain, gitlab_project, name, key, value): logging.info( 'Setting variable for %s/%s: %s=%s', gitlab_project, name, key, value) name = urllib.parse.quote('%s/%s' % (gitlab_project, name), safe='') url = 'https://%s/api/v4/projects/%s/variables' % (gitlab_domain, name) argv = [ 'curl', url, '-sv', '-X' 'POST', '-d', 'key=%s' % key, '-d', 'value=%s' % value, '-HPRIVATE-TOKEN: %s' % token, ] return runcmd('.', argv, self.MAX_CREATE_REPO_TIME) def _add_job_webhook(self, token, gitlab_domain, gitlab_project, name): webhook_url = self.WEBHOOK_URL logging.info('Adding webhook for jobs: %s', webhook_url) name = urllib.parse.quote('%s/%s' % (gitlab_project, name), safe='') url = 'https://%s/api/v4/projects/%s/hooks' % (gitlab_domain, name) argv = [ 'curl', url, '-sv', '-X' 'POST', '-d', 'url=%s' % webhook_url, '-d', 'push_events=false', '-d', 'job_events=true', '-HPRIVATE-TOKEN: %s' % token, ] return runcmd('.', argv, self.MAX_SET_WEBHOOK_TIME) def _push(self, dirname, gitlab_domain, gitlab_project, ref, name): logging.info('Pushing %s to %s as %s', dirname, gitlab_domain, name) url = 'ssh://git@%s/%s/%s.git' % (gitlab_domain, gitlab_project, name) argv = ['git', 'push', url, '%s:master' % ref] return runcmd(dirname, argv, self.MAX_PUSH_TIME) class Deployer(API): '''A deployment worker API''' MAX_DOWNLOAD_TIME = 10 MAX_UPLOAD_TIME = 60 ARTIFACT_STORE = 'wmf2-artifacts.vm.liw.fi' def __init__(self, artifact_token, ssh_target): self._artifact_token = artifact_token self._ssh_target = ssh_target def get_routes(self): return [ { 'method': 'POST', 'path': '/publish', 'func': self._publish, 'scopes': ['publish'], }, ] def _publish(self): spec = bottle.request.json logging.info('Publishing: %r', spec) artifact_id = spec['artifact_id'] published_name = spec['published_name'] fd, filename = tempfile.mkstemp() os.close(fd) try: self._get_artifact(self._artifact_token, artifact_id, filename) ssh_target = '{}/{}'.format(self._ssh_target, published_name) self._publish_via_ssh(filename, ssh_target) except Exception as e: logging.error('Deployment failed: %s', str(e)) os.remove(filename) return bottle.HTTPError(418) os.remove(filename) return 'Deployment OK\n' def _get_artifact(self, token, artifact_id, filename): logging.info('Getting artifact %s', artifact_id) url = 'https://%s/blobs/%s' % (self.ARTIFACT_STORE, artifact_id) argv = ['curl', url, '-o', filename, '-HAuthorization: Bearer %s' % token] return runcmd('.', argv, self.MAX_DOWNLOAD_TIME) def _publish_via_ssh(self, filename, ssh_target): logging.info('Copying %s to %s', filename, ssh_target) os.chmod(filename, 0o444) argv = [ 'scp', '-q', filename, ssh_target, ] return runcmd('.', argv, self.MAX_UPLOAD_TIME) class Controller(API): '''A dummy controller API''' VCSWORKER = 'https://wmf2-vcsworker.vm.liw.fi' DEPLOYER = 'https://wmf2-deployer.vm.liw.fi' MAX_TRIGGER_TIME = 60 def __init__(self, token): super().__init__() self._token = token self._builds = {} def get_routes(self): return [ { 'method': 'GET', 'path': '/status', 'func': self._status, 'scopes': ['status'], }, { 'method': 'POST', 'path': '/trigger', 'func': self._trigger, 'scopes': ['trigger'], }, { 'method': 'POST', 'path': '/webhook', 'func': self._webhook, 'scopes': None, }, ] def _status(self): return { 'builds': {name: self._builds[name] for name in sorted(self._builds)}, } def _trigger(self): spec = bottle.request.json if spec is None: logging.error('Request has null as spec') logging.debug('Request: %r', dict(bottle.request.headers)) logging.debug('Request: %r', bottle.request.body.read()) raise bottle.HTTPError(400) name = spec.get('gitlab') if name is None: logging.error('Request spec has no gitlab') raise bottle.HTTPError(400) logging.info('Triggering build of %s', name) self._builds[name] = None url = '{}/updaterepo'.format(self.VCSWORKER) argv = [ 'curl', '-HAuthorization: Bearer {}'.format(self._token), '-HContent-Type: application/json', '--data-binary', json.dumps(spec), url, ] ok = runcmd('.', argv, self.MAX_TRIGGER_TIME) if not ok: raise bottle.HTTPError(500, 'Error triggering build') return 'Triggered build' def _webhook(self): r = bottle.request obj = r.json kind = obj.get('object_kind') status = obj.get('build_status') r = obj.get('repository') if kind == 'build' and r is not None: name = r.get('name') logging.info('Repository %s build status %s', name, status) self._builds[name] = status if status == 'success': self._deploy(name) def _deploy(self, name): spec = { 'artifact_id': name, 'published_name': name, } url = '{}/publish'.format(self.DEPLOYER) argv = [ 'curl', '-HAuthorization: Bearer {}'.format(self._token), '-HContent-Type: application/json', '-X', 'POST', '--data-binary', json.dumps(spec), url, ] ok = runcmd('.', argv, self.MAX_TRIGGER_TIME) if not ok: raise bottle.HTTPError(500, 'Error deploying') return 'Deployed\n' def runcmd(cwd, argv, timeout): logging.info('Running command: %r', argv) try: p = subprocess.run( argv, cwd=cwd, timeout=timeout, stdout=subprocess.PIPE, stderr=subprocess.PIPE) except subprocess.TimeoutExpired: logging.error('Command took too long (timeout %r)', timeout) return False except Exception as e: logging.error('Error while running command: %s', str(e)) return False logging.debug('stdout: %r', p.stdout) if p.returncode != 0: logging.error('Command failed: %r', argv) logging.error('exit code: %d', p.returncode) logging.error('stdout: %r', p.stdout) logging.error('stderr: %r', p.stderr) return False logging.info('Command succeeded') return True def setup_logging(): logging.basicConfig(level=logging.DEBUG, format='%(levelname)s %(message)s') logging.info('API server starting') def get_key_from_file(filename): with open(filename) as f: key_text = f.read() return Crypto.PublicKey.RSA.importKey(key_text) def get_token_from_file(filename): with open(filename) as f: return f.read().strip() def main(): args = sys.argv[1:] cmd = args.pop(0) pubkey_filename = args.pop(0) setup_logging() if cmd == 'controller': token = get_token_from_file(args.pop(0)) api = Controller(token) elif cmd == 'vcsworker': gitlab_token = get_token_from_file(args.pop(0)) artifact_token = get_token_from_file(args.pop(0)) api = VCSWorker(gitlab_token, artifact_token) elif cmd == 'deployer': artifact_token = get_token_from_file(args.pop(0)) ssh_target = args.pop(0) api = Deployer(artifact_token, ssh_target) else: sys.exit('Unknown command %s' % cmd) app = bottle.Bottle() pubkey = get_key_from_file(pubkey_filename) api.setup(app, pubkey) try: app.run(host=HOST, port=PORT, quiet=True) except SystemExit as e: logging.info('Terminating normally') except Exception as e: logging.error('Caught exception %s', str(e)) sys.exit(1) main()