# Copyright (C) 2018 Lars Wirzenius # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . import base64 import copy import glob import json import os import tempfile import ick2 class UnknownStepError(Exception): pass class ActionFactory: _classes = { 'host': ick2.HostEnvironment, 'chroot': ick2.ChrootEnvironment, 'container': ick2.ContainerEnvironment, } def __init__(self, build_id, systree, workspace_area, reporter): self._cc = None self._build_id = build_id self._systree = systree self._workspace_area = workspace_area self._reporter = reporter self._token = None self._blob_url = None self._extra_env = {} def add_env_var(self, name, value): # pragma: no cover self._extra_env[name] = value def set_controller_client(self, cc): # pragma: no cover self._cc = cc def set_token(self, token): self._token = token def get_token(self): return self._token def set_blob_url_func(self, func): self._blob_url = func def get_blob_url_func(self): return self._blob_url def get_allowed_environments(self): return list(self._classes.keys()) def get_workspace_area(self): area = ick2.WorkspaceArea() area.set_root(self._workspace_area) return area def create_environment(self, spec, project_name): env = spec.get('where', 'host') assert env in self.get_allowed_environments() env_class = self._classes[env] area = self.get_workspace_area() ws = area.create_workspace(project_name) env = env_class(self._systree, ws.get_directory(), self._reporter) env.set_extra_env(self._extra_env) return env def create_action(self, spec, project_name): env = self.create_environment(spec, project_name) action = self._create_action_object(env, spec) action.set_controller_client(self._cc) action.set_build_id(self._build_id) action.set_token(self.get_token()) action.set_blob_url_func(self.get_blob_url_func()) return action def _create_action_object(self, env, spec): rules = [ ('shell', ShellAction), ('python', PythonAction), ('debootstrap', DebootstrapAction), ('archive', ArchiveWorkspaceAction), ] for key, klass in rules: if key in spec: return klass(env) if 'action' in spec: rules2 = { 'populate_systree': PopulateSystreeAction, 'populate_workspace': PopulateWorkspaceAction, 'create_workspace': CreateWorkspaceAction, 'git': GitAction, 'rsync': RsyncAction, 'dput': DputAction, 'notify': NotifyAction, } kind = spec['action'] klass = rules2.get(kind) if klass: return klass(env) raise UnknownStepError('Unknown action %r' % spec) class Action: # pragma: no cover def __init__(self, env): self._env = env self._cc = None self._build_id = None self._token = None self._blob_url = None def set_controller_client(self, cc): self._cc = cc def get_controller_client(self): return self._cc def set_build_id(self, build_id): self._build_id = build_id def get_build_id(self): return self._build_id def set_token(self, token): self._token = token def get_token(self): return self._token def set_blob_url_func(self, func): self._blob_url = func def get_blob_upload_url(self, blob_name): return self._blob_url(blob_name) def get_env(self): return self._env def get_workspace_area(self): env = self.get_env() area = ick2.WorkspaceArea() area.set_root(env.get_workspace_directory()) return area def encode_parameters(self, parsms): raise NotImplementedError() def encode64(self, params): assert isinstance(params, dict) as_text = json.dumps(params) as_bytes = as_text.encode('UTF-8') as_base64 = base64.b64encode(as_bytes) return as_base64.decode('UTF-8') def decode64(self, encoded): as_base64 = encoded.encode('UTF-8') as_bytes = base64.b64decode(as_base64) as_text = as_bytes.decode('UTF-8') return json.loads(as_text) def get_authz_headers(self): token = self.get_token() return { 'Authorization': 'Bearer {}'.format(token), } def execute(self, params, step): raise NotImplementedError() class ShellAction(Action): def encode_parameters(self, params): # pragma: no cover encoded = self.encode64(params) return 'params() { echo -n "%s" | base64 -d; }\n' % encoded def execute(self, params, step): prefix = self.encode_parameters(params) snippet = step['shell'] argv = ['bash', '-exuc', prefix + snippet] exit_code = self._env.runcmd(argv) self._env.report(exit_code, 'action finished\n') return exit_code class PythonAction(Action): def encode_parameters(self, params): # pragma: no cover encoded = self.encode64(params) prefix = ( 'import base64, json\n' 'params = json.loads(base64.b64decode(\n' ' "{}").decode("utf8"))\n' ).format(encoded) return prefix def execute(self, params, step): prefix = self.encode_parameters(params) snippet = step['python'] argv = ['python3', '-c', prefix + '\n' + snippet] exit_code = self._env.runcmd(argv) self._env.report(exit_code, 'action finished\n') return exit_code class DebootstrapAction(Action): default_mirror = 'http://deb.debian.org/debian' def encode_parameters(self, params): # pragma: no cover pass def execute(self, params, step): suite = step.get('debootstrap') if suite is None or suite == 'auto': suite = params['debian_codename'] mirror = step.get('mirror', self.default_mirror) env = self.get_env() workspace = env.get_workspace_directory() argv = ['sudo', 'debootstrap', suite, '.', mirror] exit_code = self._env.host_runcmd(argv, cwd=workspace) self._env.report(exit_code, 'action finished\n') return exit_code class CreateWorkspaceAction(Action): def encode_parameters(self, params): # pragma: no cover pass def execute(self, params, step): env = self.get_env() dirname = env.get_workspace_directory() make_directory_empty(env, dirname) self._env.report(0, 'Created or emptied workspace %s\n' % dirname) return 0 class ArchiveWorkspaceAction(Action): # pragma: no cover def encode_parameters(self, params): pass def execute(self, params, step): env = self.get_env() dirname = env.get_workspace_directory() name_from = step.get('name_from', 'artifact_name') blob_name = params.get(name_from) if not blob_name: env.report(1, 'No artifact_name parameter\n') return 1 globs = step.get('globs') if globs is None: names = ['.'] else: names = self.match_globs(dirname, globs) url = self.get_blob_upload_url(blob_name) headers = self.get_authz_headers() self._env.report(None, 'Creating tarball from workspace\n') fd, tarball = tempfile.mkstemp() os.close(fd) tar = ['sudo', 'tar', '-zcf', tarball, '-C', dirname] + names exit_code = self._env.host_runcmd(tar) if exit_code != 0: self._env.report(exit_code, 'tarball generation finished\n') os.remove(tarball) return exit_code self._env.report(None, 'tarball generation finished\n') self._env.report(None, 'Uploading tarball to artifact store\n') curl = ['curl', '-sk', '-T', tarball] + [ '-H{}:{}'.format(name, value) for name, value in headers.items() ] + [url] exit_code = self._env.host_runcmd(curl) self._env.report( exit_code, 'curl upload finished (exit code %s)\n' % exit_code) os.remove(tarball) return exit_code def match_globs(self, workspace, globs): names = [] for pat in globs: abspat = os.path.join(workspace, './' + pat) for name in glob.glob(abspat): names.append(os.path.normpath(name)) return names class PopulateActionBase(Action): # pragma: no cover step_field = None param_name = None def encode_parameters(self, params): pass def execute(self, params, step): env = self.get_env() name = step.get(self.step_field) if not name or name == 'auto': name = params.get(self.param_name) if not name: msg = '{} in action is {}, but no {} params\n'.format( self.step_field, name, self.param_name) env.report(1, msg) return 1 env.report(None, 'Using {} for artifact name\n'.format(name)) dirname = self.get_unpack_directory(env) make_directory_empty(env, dirname) exit_code = self.download_and_unpack_artifact(name, dirname) new_code = self.mangle_exit_code(exit_code) env.report( new_code, '{} finished (exit_code {} -> {})\n'.format( str(self), exit_code, new_code)) return new_code def get_unpack_directory(self, env): raise NotImplementedError() def mangle_exit_code(self, exit_code): raise NotImplementedError() def download_and_unpack_artifact(self, name, dirname): url = self.get_blob_upload_url(name) headers = self.get_authz_headers() curl = ['curl', '-sk'] + [ '-H{}:{}'.format(name, value) for name, value in headers.items() ] + [url] untar = ['sudo', 'tar', '-zxf', '-', '-C', dirname] env = self.get_env() return env.host_runcmd(curl, untar) class PopulateSystreeAction(PopulateActionBase): # pragma: no cover step_field = 'systree_name' param_name = 'systree_name' def __str__(self): return 'populate-systree' def get_unpack_directory(self, env): return env.get_systree_directory() def mangle_exit_code(self, exit_code): return exit_code class PopulateWorkspaceAction(PopulateActionBase): # pragma: no cover step_field = 'workspace_name' param_name = 'workspace_name' def __str__(self): return 'populate-workspace' def get_unpack_directory(self, env): return env.get_workspace_directory() def mangle_exit_code(self, exit_code): # We never fail this action. The artifact might not exist. return 0 class GitAction(Action): # pragma: no cover def encode_parameters(self, params): pass def execute(self, params, step): env = self.get_env() workspace = env.get_workspace_directory() git_dir = params.get('git_dir') if git_dir is None: env.report(1, 'git_dir not provided\n') if git_dir.startswith('/') or '..' in git_dir: env.report(1, 'git_dir not acceptable\n') git_url = params.get('git_url') if git_url is None: env.report(1, 'git_url not provided\n') pathname = os.path.join(workspace, git_dir) if os.path.exists(pathname): argv = ['git', 'remote', '-v', 'update', '--prune'] cwd = pathname else: argv = ['git', 'clone', '-v', git_url, git_dir] cwd = workspace exit_code = env.host_runcmd(argv, cwd=cwd) env.report(exit_code, 'git finished (exit code %d)\n' % exit_code) return exit_code class RsyncAction(Action): # pragma: no cover def encode_parameters(self, params): pass def execute(self, params, step): env = self.get_env() workspace = env.get_workspace_directory() rsync_src = params.get('rsync_src') if rsync_src is None: env.report(1, 'rsync_src not provided\n') if not self._is_relative(rsync_src): env.report(1, 'rsync_src not acceptable\n') rsync_target = params.get('rsync_target') if rsync_target is None: env.report(1, 'git_url not provided\n') if not self._remote(rsync_target): env.report(1, 'rsync_target not acceptable\n') argv = [ 'rsync', '-av', '--delete-after', './{}/.'.format(rsync_src), '{}/.'.format(rsync_target), ] exit_code = env.host_runcmd(argv, cwd=workspace) env.report(exit_code, 'rsync finished (exit code %d)\n' % exit_code) return exit_code def _is_relative(self, src): if src.startswith('/'): return False if '../' in src: return False return True def _remote(self, target): return ':' in target class DputAction(Action): # pragma: no cover def encode_parameters(self, params): pass def execute(self, params, step): env = self.get_env() workspace = env.get_workspace_directory() argv = ['sh', '-c', 'dput ick *.changes'] exit_code = env.host_runcmd(argv, cwd=workspace) env.report(exit_code, 'dput finished (exit code %d)\n' % exit_code) return exit_code class NotifyAction(Action): # pragma: no cover def encode_parameters(self, params): pass def execute(self, params, step): env = self.get_env() cc = self.get_controller_client() assert cc is not None build_id = self.get_build_id() env.report(None, 'Notifying about build ending\n') build_path = '/builds/{}'.format(build_id) build = cc.show(build_path) params = build.get('parameters', {}) if 'notify' not in params: env.report( 0, 'NOT notifying about build ending: no "notify" parameter.\n') return recipients = params['notify'] log = cc.get_log(build_id) log = log.decode('utf-8') notify = { 'recipients': recipients, 'build': self.mangle_build(build), 'log': log, } cc.notify(notify) env.report(0, 'Notified about build {} ending\n'.format(build_id)) def mangle_build(self, build): b = copy.deepcopy(build) exit_code = build.get('exit_code') if exit_code is None: b['status'] = 'BUILDING' elif exit_code == 0: b['status'] = 'SUCCESS' else: b['status'] = 'FAILED' return b def make_directory_empty(env, dirname): return env.runcmd( ['sudo', 'find', dirname, '-mindepth', '1', '-delete'])