# Copyright (C) 2018 Lars Wirzenius # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . import base64 import json import logging import os import tempfile import ick2 class UnknownStepError(Exception): pass class ActionFactory: _classes = { 'host': ick2.HostEnvironment, 'chroot': ick2.ChrootEnvironment, 'container': ick2.ContainerEnvironment, } def __init__(self, systree, workspace_area, reporter): self._systree = systree self._workspace_area = workspace_area self._reporter = reporter self._token = None self._blob_url = None self._extra_env = {} def add_env_var(self, name, value): # pragma: no cover self._extra_env[name] = value def set_token(self, token): self._token = token def get_token(self): return self._token def set_blob_url_func(self, func): self._blob_url = func def get_blob_url_func(self): return self._blob_url def get_allowed_environments(self): return list(self._classes.keys()) def get_workspace_area(self): area = ick2.WorkspaceArea() area.set_root(self._workspace_area) return area def create_environment(self, spec, project_name): env = spec.get('where', 'host') assert env in self.get_allowed_environments() env_class = self._classes[env] area = self.get_workspace_area() ws = area.create_workspace(project_name) env = env_class(self._systree, ws.get_directory(), self._reporter) env.set_extra_env(self._extra_env) return env def create_action(self, spec, project_name): env = self.create_environment(spec, project_name) action = self._create_action_object(env, spec) action.set_token(self.get_token()) action.set_blob_url_func(self.get_blob_url_func()) return action def _create_action_object(self, env, spec): if 'shell' in spec: return ShellAction(env) if 'python' in spec: return PythonAction(env) if 'debootstrap' in spec: return DebootstrapAction(env) if 'archive' in spec: return ArchiveWorkspaceAction(env) if spec.get('action') == 'populate_systree': return PopulateSystreeAction(env) if spec.get('action') == 'create_workspace': return CreateWorkspaceAction(env) raise UnknownStepError('Unknown action %r' % spec) class Action: # pragma: no cover def __init__(self, env): self._env = env self._token = None self._blob_url = None def set_token(self, token): self._token = token def get_token(self): return self._token def set_blob_url_func(self, func): self._blob_url = func def get_blob_upload_url(self, blob_name): return self._blob_url(blob_name) def get_env(self): return self._env def get_workspace_area(self): env = self.get_env() area = ick2.WorkspaceArea() area.set_root(env.get_workspace_directory()) return area def encode_parameters(self, parsms): raise NotImplementedError() def encode64(self, params): assert isinstance(params, dict) as_text = json.dumps(params) as_bytes = as_text.encode('UTF-8') as_base64 = base64.b64encode(as_bytes) return as_base64.decode('UTF-8') def decode64(self, encoded): as_base64 = encoded.encode('UTF-8') as_bytes = base64.b64decode(as_base64) as_text = as_bytes.decode('UTF-8') return json.loads(as_text) def get_authz_headers(self): token = self.get_token() return { 'Authorization': 'Bearer {}'.format(token), } def execute(self, params, step): raise NotImplementedError() class ShellAction(Action): def encode_parameters(self, params): # pragma: no cover encoded = self.encode64(params) return 'params() { echo -n "%s" | base64 -d; }\n' % encoded def execute(self, params, step): prefix = self.encode_parameters(params) snippet = step['shell'] argv = ['bash', '-exuc', prefix + snippet] exit_code = self._env.runcmd(argv) self._env.report(exit_code, 'action finished\n') return exit_code class PythonAction(Action): def encode_parameters(self, params): # pragma: no cover encoded = self.encode64(params) prefix = ( 'import base64, json\n' 'params = json.loads(base64.b64decode(\n' ' "{}").decode("utf8"))\n' ).format(encoded) return prefix def execute(self, params, step): prefix = self.encode_parameters(params) snippet = step['python'] argv = ['python3', '-c', prefix + '\n' + snippet] exit_code = self._env.runcmd(argv) self._env.report(exit_code, 'action finished\n') return exit_code class DebootstrapAction(Action): default_mirror = 'http://deb.debian.org/debian' def encode_parameters(self, params): # pragma: no cover pass def execute(self, params, step): suite = step.get('debootstrap') if suite is None or suite == 'auto': suite = params['debian_codename'] mirror = step.get('mirror', self.default_mirror) env = self.get_env() workspace = env.get_workspace_directory() argv = ['sudo', 'debootstrap', suite, '.', mirror] exit_code = self._env.host_runcmd(argv, cwd=workspace) self._env.report(exit_code, 'action finished\n') return exit_code class CreateWorkspaceAction(Action): def encode_parameters(self, params): # pragma: no cover pass def execute(self, params, step): env = self.get_env() dirname = env.get_workspace_directory() make_directory_empty(env, dirname) self._env.report(0, 'Created or emptied workspace %s\n' % dirname) return 0 class ArchiveWorkspaceAction(Action): # pragma: no cover def encode_parameters(self, params): pass def execute(self, params, step): blob_name = params.get('systree_name', 'noname') env = self.get_env() dirname = env.get_workspace_directory() url = self.get_blob_upload_url(blob_name) headers = self.get_authz_headers() fd, tarball = tempfile.mkstemp() os.close(fd) tar = ['sudo', 'tar', '-zcf', tarball, '-C', dirname, '.'] exit_code = self._env.host_runcmd(tar) if exit_code != 0: self._env.report(exit_code, 'tarball generation finished\n') os.remove(tarball) return exit_code self._env.report(None, 'tarball generation finished\n') curl = ['curl', '-sk', '-T', tarball] + [ '-H{}:{}'.format(name, value) for name, value in headers.items() ] + [url] exit_code = self._env.host_runcmd(curl) self._env.report( exit_code, 'curl upload finished (exit code %s)\n' % exit_code) os.remove(tarball) return exit_code class PopulateSystreeAction(Action): # pragma: no cover def encode_parameters(self, params): pass def execute(self, params, step): systree_name = step.get('systree_name') if not systree_name or systree_name == 'auto': systree_name = params['systree_name'] logging.info( 'No systree name or it is "auto", using %s', systree_name) env = self.get_env() systree_dir = env.get_systree_directory() make_directory_empty(env, systree_dir) return self.download_and_unpack_systree(systree_name, systree_dir) def download_and_unpack_systree(self, systree_name, dirname): url = self.get_blob_upload_url(systree_name) headers = self.get_authz_headers() curl = ['curl', '-sk'] + [ '-H{}:{}'.format(name, value) for name, value in headers.items() ] + [url] untar = ['sudo', 'tar', '-zxf', '-', '-C', dirname] exit_code = self._env.host_runcmd(curl, untar) self._env.report(exit_code, 'action finished\n') return exit_code def make_directory_empty(env, dirname): return env.runcmd( ['sudo', 'find', dirname, '-mindepth', '1', '-delete'])