# Copyright (C) 2018 Lars Wirzenius # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . import base64 import copy import glob import json import logging import os import tempfile import ick2 class UnknownStepError(Exception): pass class ActionFactory: _classes = { 'host': ick2.HostEnvironment, 'chroot': ick2.ChrootEnvironment, 'container': ick2.ContainerEnvironment, } def __init__(self, build_id, systree, workspace_area, reporter): self._cc = None self._build_id = build_id self._systree = systree self._workspace_area = workspace_area self._reporter = reporter self._token = None self._blob_url = None self._extra_env = {} def add_env_var(self, name, value): # pragma: no cover self._extra_env[name] = value def set_controller_client(self, cc): # pragma: no cover self._cc = cc def set_token(self, token): self._token = token def get_token(self): return self._token def set_blob_url_func(self, func): self._blob_url = func def get_blob_url_func(self): return self._blob_url def get_allowed_environments(self): return list(self._classes.keys()) def get_workspace_area(self): area = ick2.WorkspaceArea() area.set_root(self._workspace_area) return area def create_environment(self, spec, project_name): env = spec.get('where', 'host') assert env in self.get_allowed_environments() env_class = self._classes[env] area = self.get_workspace_area() ws = area.create_workspace(project_name) env = env_class(self._systree, ws.get_directory(), self._reporter) env.set_extra_env(self._extra_env) return env def create_action(self, spec, project_name): env = self.create_environment(spec, project_name) action = self._create_action_object(env, spec) action.set_controller_client(self._cc) action.set_build_id(self._build_id) action.set_token(self.get_token()) action.set_blob_url_func(self.get_blob_url_func()) return action def _create_action_object(self, env, spec): rules = [ ('shell', ShellAction), ('python', PythonAction), ('debootstrap', DebootstrapAction), ] for key, klass in rules: if key in spec: return klass(env) if 'archive' in spec: rules2 = { 'workspace': ArchiveWorkspaceAction, 'systree': ArchiveSystreeAction, } kind = spec['archive'] klass = rules2.get(kind) if klass: return klass(env) if 'action' in spec: rules2 = { 'populate_systree': PopulateSystreeAction, 'populate_workspace': PopulateWorkspaceAction, 'create_workspace': CreateWorkspaceAction, 'git': GitAction, 'git_mirror': GitMirrorAction, 'rsync': RsyncAction, 'dput': DputAction, 'notify': NotifyAction, } kind = spec['action'] klass = rules2.get(kind) if klass: return klass(env) raise UnknownStepError('Unknown action %r' % spec) class Action: # pragma: no cover def __init__(self, env): self._env = env self._cc = None self._build_id = None self._token = None self._blob_url = None def set_controller_client(self, cc): self._cc = cc def get_controller_client(self): return self._cc def set_build_id(self, build_id): self._build_id = build_id def get_build_id(self): return self._build_id def set_token(self, token): self._token = token def get_token(self): return self._token def set_blob_url_func(self, func): self._blob_url = func def get_blob_upload_url(self, blob_name): return self._blob_url(blob_name) def get_env(self): return self._env def get_workspace_area(self): env = self.get_env() area = ick2.WorkspaceArea() area.set_root(env.get_workspace_directory()) return area def encode_parameters(self, parsms): raise NotImplementedError() def encode64(self, params): assert isinstance(params, dict) as_text = json.dumps(params) as_bytes = as_text.encode('UTF-8') as_base64 = base64.b64encode(as_bytes) return as_base64.decode('UTF-8') def decode64(self, encoded): as_base64 = encoded.encode('UTF-8') as_bytes = base64.b64decode(as_base64) as_text = as_bytes.decode('UTF-8') return json.loads(as_text) def get_authz_headers(self): token = self.get_token() return { 'Authorization': 'Bearer {}'.format(token), } def execute(self, params, step): raise NotImplementedError() class ShellAction(Action): def encode_parameters(self, params): # pragma: no cover encoded = self.encode64(params) return 'params() { echo -n "%s" | base64 -d; }\n' % encoded def execute(self, params, step): prefix = self.encode_parameters(params) snippet = step['shell'] argv = ['bash', '-exuc', prefix + snippet] exit_code = self._env.runcmd(argv) self._env.report(exit_code, 'action finished\n') return exit_code class PythonAction(Action): def encode_parameters(self, params): # pragma: no cover encoded = self.encode64(params) prefix = ( 'import base64, json, subprocess\n' 'params = json.loads(base64.b64decode(\n' ' "{}").decode("utf8"))\n' 'def RUN(*args, **kwargs):\n' ' print("Executing:", args, kwargs)\n' ' if "check" not in kwargs:\n' ' kwargs["check"] = True\n' ' return subprocess.run(args, **kwargs)\n' 'def OUT(*args, **kwargs):\n' ' x = RUN(*args, stdout=subprocess.PIPE, **kwargs)\n' ' return x.stdout.decode("UTF-8")\n' 'def ERR(*args, **kwargs):\n' ' x = RUN(*args, stderr=subprocess.PIPE, check=False, **kwargs)\n' ' return x.stderr.decode("UTF-8")\n' 'def OUTERR(*args, **kwargs):\n' ' x = RUN(*args, stdout=subprocess.PIPE, \n' ' sterr=subproces.STDOUT, check=False, **kwargs)\n' ' return s.stdout.decode("UTF-8"), x.stderr.decode("UTF-8")\n' ).format(encoded) return prefix def execute(self, params, step): prefix = self.encode_parameters(params) snippet = step['python'] argv = ['python3', '-c', prefix + '\n' + snippet] exit_code = self._env.runcmd(argv) self._env.report(exit_code, 'action finished\n') return exit_code class DebootstrapAction(Action): default_mirror = 'http://deb.debian.org/debian' def encode_parameters(self, params): # pragma: no cover pass def execute(self, params, step): suite = step.get('debootstrap') if suite is None or suite == 'auto': suite = params['debian_codename'] mirror = step.get('mirror', self.default_mirror) env = self.get_env() workspace = env.get_workspace_directory() argv = ['sudo', 'debootstrap', suite, '.', mirror] exit_code = self._env.host_runcmd(argv, cwd=workspace) self._env.report(exit_code, 'action finished\n') return exit_code class CreateWorkspaceAction(Action): def encode_parameters(self, params): # pragma: no cover pass def execute(self, params, step): env = self.get_env() dirname = env.get_workspace_directory() make_directory_empty(env, dirname) self._env.report(0, 'Created or emptied workspace %s\n' % dirname) return 0 class ArchiveBaseAction(Action): # pragma: no cover def get_dirname(self, env): raise NotImplementedError() def encode_parameters(self, params): pass def execute(self, params, step): env = self.get_env() dirname = self.get_dirname(env) name_from = step.get('name_from', 'artifact_name') blob_name = params.get(name_from) if not blob_name: env.report(1, 'No artifact_name parameter\n') return 1 env.report(None, 'Creating new artifact named {}\n'.format(blob_name)) env.report(None, 'Artifact will be created from {}\n'.format(dirname)) globs = step.get('globs') if globs is None: names = ['.'] else: names = self.match_globs(dirname, globs) url = self.get_blob_upload_url(blob_name) headers = self.get_authz_headers() self._env.report(None, 'Creating tarball\n') fd, tarball = tempfile.mkstemp() os.close(fd) tar = ['sudo', 'tar', '-zvcf', tarball, '-C', dirname] + names exit_code = self._env.host_runcmd(tar) if exit_code != 0: self._env.report(exit_code, 'Tarball generation failed\n') os.remove(tarball) return exit_code self._env.report(None, 'Tarball generation finished OK\n') self._env.report(None, 'Uploading tarball to artifact store\n') curl = ['curl', '-sk', '-T', tarball] + [ '-H{}:{}'.format(name, value) for name, value in headers.items() ] + [url] exit_code = self._env.host_runcmd(curl) self._env.report( exit_code, 'curl upload finished (exit code %s)\n' % exit_code) os.remove(tarball) return exit_code def match_globs(self, workspace, globs): names = [] for pat in globs: abspat = os.path.join(workspace, './' + pat) for name in glob.glob(abspat): names.append(os.path.normpath(name)) return names class ArchiveSystreeAction(ArchiveBaseAction): # pragma: no cover def get_dirname(self, env): return env.get_systree_directory() class ArchiveWorkspaceAction(ArchiveBaseAction): # pragma: no cover def get_dirname(self, env): return env.get_workspace_directory() class PopulateActionBase(Action): # pragma: no cover step_field = None param_name = None def encode_parameters(self, params): pass def execute(self, params, step): env = self.get_env() name = step.get(self.step_field) if not name or name == 'auto': name_name = step.get('name_from', self.param_name) name = params.get(name_name) if not name: msg = '{} in action is {}, but no {} parameter\n'.format( self.step_field, name, name_name) env.report(1, msg) return 1 env.report(None, 'Using {} for artifact name\n'.format(name)) dirname = self.get_unpack_directory(env) make_directory_empty(env, dirname) exit_code = self.download_and_unpack_artifact(name, dirname) new_code = self.mangle_exit_code(exit_code) env.report( new_code, '{} finished (exit_code {} -> {})\n'.format( str(self), exit_code, new_code)) return new_code def get_unpack_directory(self, env): raise NotImplementedError() def mangle_exit_code(self, exit_code): raise NotImplementedError() def download_and_unpack_artifact(self, name, dirname): url = self.get_blob_upload_url(name) headers = self.get_authz_headers() curl = ['curl', '-sk'] + [ '-H{}:{}'.format(name, value) for name, value in headers.items() ] + [url] untar = ['sudo', 'tar', '-zxf', '-', '-C', dirname] env = self.get_env() return env.host_runcmd(curl, untar) class PopulateSystreeAction(PopulateActionBase): # pragma: no cover step_field = 'systree_name' param_name = 'systree_name' def __str__(self): return 'populate-systree' def get_unpack_directory(self, env): return env.get_systree_directory() def mangle_exit_code(self, exit_code): return exit_code class PopulateWorkspaceAction(PopulateActionBase): # pragma: no cover step_field = 'workspace_name' param_name = 'workspace_name' def __str__(self): return 'populate-workspace' def get_unpack_directory(self, env): return env.get_workspace_directory() def mangle_exit_code(self, exit_code): # We never fail this action. The artifact might not exist. return 0 class GitAction(Action): # pragma: no cover def encode_parameters(self, params): pass def execute(self, params, step): env = self.get_env() workspace = env.get_workspace_directory() git_dir = params.get('git_dir') if git_dir is None: env.report(1, 'git_dir not provided\n') if git_dir.startswith('/') or '..' in git_dir: env.report(1, 'git_dir not acceptable\n') git_url = params.get('git_url') if git_url is None: env.report(1, 'git_url not provided\n') pathname = os.path.join(workspace, git_dir) if os.path.exists(pathname): argv = ['git', 'remote', '-v', 'update', '--prune'] cwd = pathname else: argv = ['git', 'clone', '-v', git_url, git_dir] cwd = workspace exit_code = env.host_runcmd(argv, cwd=cwd) env.report(exit_code, 'git finished (exit code %d)\n' % exit_code) return exit_code class GitMirrorAction(Action): # pragma: no cover def encode_parameters(self, params): pass def execute(self, params, step): env = self.get_env() workspace = env.get_workspace_directory() mirrors = os.path.join(workspace, '.mirrors') if step.get('where') != 'host': env.report(1, '"where" must be "host"\n') return 1 sources = params.get('sources') if sources is None: env.report(1, '"sources" parameter not provided\n') return 1 try: exit_code = self.git_mirror(env, sources, mirrors) except Exception as e: env.report(1, 'Caught exception: {}\n'.format(e)) return 1 env.report( exit_code, 'git mirror action finished (exit code %d)\n' % exit_code) return exit_code def git_mirror(self, env, sources, mirrors): if not os.path.exists(mirrors): env.report(None, 'mkdir {}\n'.format(mirrors)) os.mkdir(mirrors) checked = self.check_sources(sources) for name, url in checked: exit_code = self.mirror(env, mirrors, name, url) if exit_code != 0: return exit_code return 0 def check_sources(self, sources): checked = [] for source in sources: name = source.get('name') repo = source.get('repo') if name is None: raise Exception('source lacks "name" field: {}'.format(source)) if repo is None: raise Exception('source lacks "repo" field: {}'.format(source)) checked.append((name, repo)) return checked def mirror(self, env, mirrors, name, url): env.report(None, 'git_mirror: mirrors: {}\n'.format(mirrors)) env.report(None, 'git_mirror: name: {}\n'.format(name)) env.report(None, 'git_mirror: url: {}\n'.format(url)) dirname = os.path.join(mirrors, name) env.report(None, 'git_mirror: dirname: {}\n'.format(dirname)) if os.path.exists(dirname): argv = ['git', 'remote', 'update', '--prune'] cwd = dirname else: argv = ['git', 'clone', '--mirror', url, name] cwd = mirrors os.mkdir(dirname) env.report(None, 'Running: {} in {}\n'.format(argv, cwd)) return env.host_runcmd(argv, cwd=cwd) class RsyncAction(Action): # pragma: no cover def encode_parameters(self, params): pass def execute(self, params, step): env = self.get_env() workspace = env.get_workspace_directory() rsync_src = params.get('rsync_src') if rsync_src is None: env.report(1, 'rsync_src not provided\n') if not self._is_relative(rsync_src): env.report(1, 'rsync_src not acceptable\n') rsync_target = params.get('rsync_target') if rsync_target is None: env.report(1, 'git_url not provided\n') if not self._remote(rsync_target): env.report(1, 'rsync_target not acceptable\n') argv = [ 'rsync', '-av', '--delete-after', './{}/.'.format(rsync_src), '{}/.'.format(rsync_target), ] exit_code = env.host_runcmd(argv, cwd=workspace) env.report(exit_code, 'rsync finished (exit code %d)\n' % exit_code) return exit_code def _is_relative(self, src): if src.startswith('/'): return False if '../' in src: return False return True def _remote(self, target): return ':' in target class DputAction(Action): # pragma: no cover def encode_parameters(self, params): pass def execute(self, params, step): env = self.get_env() workspace = env.get_workspace_directory() apt_server = self._cc.get_apt_server() config = self.get_dput_config(apt_server) logging.debug('dput config:\n%s', config) filename = self.create_dput_config_file(config) argv = ['sh', '-c', 'dput -c {} ick *.changes'.format(filename)] exit_code = env.host_runcmd(argv, cwd=workspace) env.report(exit_code, 'dput finished (exit code %d)\n' % exit_code) os.remove(config) return exit_code def create_dput_config_file(self, config): fd, filename = tempfile.mkstemp() os.close(fd) with open(filename, "w") as f: f.write(config) return filename def get_dput_config(self, apt_server): template = '''\ [ick] login = incoming fqdn = {apt_server} method = scp incoming = /srv/apt/incoming allow_unsigned_uploads = 1 check_version = 0 run_dinstall = 0 ''' return template.format(apt_server=apt_server) class NotifyAction(Action): # pragma: no cover def encode_parameters(self, params): pass def execute(self, params, step): env = self.get_env() cc = self.get_controller_client() assert cc is not None build_id = self.get_build_id() env.report(None, 'Notifying about build ending\n') build_path = '/builds/{}'.format(build_id) build = cc.show(build_path) params = build.get('parameters', {}) if 'notify' not in params: env.report( 0, 'NOT notifying about build ending: no "notify" parameter.\n') return recipients = params['notify'] log = cc.get_log(build_id) log = log.decode('utf-8') notify = { 'recipients': recipients, 'build': self.mangle_build(build), 'log': log, } cc.notify(notify) env.report(0, 'Notified about build {} ending\n'.format(build_id)) def mangle_build(self, build): b = copy.deepcopy(build) exit_code = build.get('exit_code') if exit_code is None: b['status'] = 'BUILDING' elif exit_code == 0: b['status'] = 'SUCCESS' else: b['status'] = 'FAILED' return b def make_directory_empty(env, dirname): return env.runcmd( ['sudo', 'find', dirname, '-mindepth', '1', '-delete'])