diff options
author | Lars Wirzenius <liw@liw.fi> | 2018-03-30 12:13:53 +0300 |
---|---|---|
committer | Lars Wirzenius <liw@liw.fi> | 2018-03-30 12:13:53 +0300 |
commit | 5a93f67af48df039fac38aa064ec96834232e72e (patch) | |
tree | 1e95c895d08022e94937cceb9efc363c7b1c3575 | |
parent | 3f8b808f63fa5b321e45644df2c83f8d75aa623d (diff) | |
parent | 17253c6e422b52988b5abef43c2b797a5d2a6e8d (diff) | |
download | ick2-5a93f67af48df039fac38aa064ec96834232e72e.tar.gz |
Merge: rewrite worker manager
-rw-r--r-- | NEWS | 3 | ||||
-rwxr-xr-x | check | 35 | ||||
-rw-r--r-- | ick2/__init__.py | 26 | ||||
-rw-r--r-- | ick2/actionenvs.py | 167 | ||||
-rw-r--r-- | ick2/actionenvs_tests.py | 138 | ||||
-rw-r--r-- | ick2/actions.py | 293 | ||||
-rw-r--r-- | ick2/actions_tests.py | 230 | ||||
-rw-r--r-- | ick2/client.py | 238 | ||||
-rw-r--r-- | ick2/client_tests.py | 274 | ||||
-rw-r--r-- | ick2/workspace.py | 64 | ||||
-rw-r--r-- | ick2/workspace_tests.py | 68 | ||||
-rwxr-xr-x | worker_manager | 492 |
12 files changed, 1573 insertions, 455 deletions
@@ -25,6 +25,9 @@ Version 0.28+git, not yet released * Artifact store can handle large artifacts now. Tested with 100 gigabytes. +* The worker manager has been largely rewritten, with (unit) tests + added. The new code is more maintainable and more easily extensible. + Version 0.28, released 2018-02-24 ---------------------------------- @@ -33,23 +33,27 @@ title() title Remote or local yarns? remote=no unit=yes +yarns=yes if [ "$#" -gt 0 ] then case "$1" in https://*) remote=yes unit=no + yarns=yes remote_url="$1" shift 1 ;; yarns) remote=no unit=no + yarns=yes shift 1 ;; local) remote=no unit=yes + yarns=no shift 1 ;; *) @@ -85,22 +89,25 @@ then pylint3 --rcfile pylint.conf $python_sources fi -title Yarns -if [ "$remote" = no ] +if [ "$yarns" = yes ] then - impl=yarns/900-local.yarn - args="" -else - impl=yarns/900-remote.yarn - args="--env ICK_URL=$remote_url" + title Yarns + if [ "$remote" = no ] + then + impl=yarns/900-local.yarn + args="" + else + impl=yarns/900-remote.yarn + args="--env ICK_URL=$remote_url" + fi + yarn yarns/[^9]*.yarn yarns/900-implements.yarn "$impl" \ + --shell python2 \ + --shell-arg '' \ + --shell-library yarns/lib.py \ + --cd-datadir \ + $args \ + "$@" fi -yarn yarns/[^9]*.yarn yarns/900-implements.yarn "$impl" \ - --shell python2 \ - --shell-arg '' \ - --shell-library yarns/lib.py \ - --cd-datadir \ - $args \ - "$@" title OK echo "All tests pass" diff --git a/ick2/__init__.py b/ick2/__init__.py index f5770f6..8673046 100644 --- a/ick2/__init__.py +++ b/ick2/__init__.py @@ -51,3 +51,29 @@ from .controllerapi import ( ) from .blobapi import BlobAPI from .blob_store import BlobStore + +from .client import ( + HttpAPI, + HttpError, + ControllerClient, + BlobClient, + Reporter, +) +from .actionenvs import ( + Runner, + ActionEnvironment, + HostEnvironment, + ChrootEnvironment, + ContainerEnvironment, +) +from .workspace import WorkspaceArea, Workspace +from .actions import ( + ActionFactory, + UnknownStepError, + ShellAction, + PythonAction, + DebootstrapAction, + CreateWorkspaceAction, + ArchiveWorkspaceAction, + PopulateSystreeAction, +) diff --git a/ick2/actionenvs.py b/ick2/actionenvs.py new file mode 100644 index 0000000..7a2b57a --- /dev/null +++ b/ick2/actionenvs.py @@ -0,0 +1,167 @@ +# Copyright (C) 2018 Lars Wirzenius +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + + +import logging +import os +import subprocess + + +import cliapp + + +class Runner: + + def __init__(self, reporter, maxbuf=128*1024): + self._reporter = reporter + self._buffers = { + 'stdout': b'', + 'stderr': b'', + } + self._maxbuf = maxbuf + self._timeout = 1.0 + + def runcmd(self, *argvs, **kwargs): + for argv in argvs: + logging.debug('Runner.runcmd: argv: %r', argv) + for key in kwargs: + logging.debug('Runner.runcmd: kwargs: %r=%r', key, kwargs[key]) + assert all(argv is not None for argv in argvs) + exit_code, _, _ = cliapp.runcmd_unchecked( + *argvs, + stdout_callback=self.capture_stdout, + stderr=subprocess.STDOUT, + output_timeout=self._timeout, + timeout_callback=self.flush, + **kwargs + ) + self.flush() + logging.debug('Runner.runcmd: finished, exit code: %d', exit_code) + return exit_code + + def capture_stdout(self, data): + return self.capture('stdout', data) + + def capture(self, stream_name, data): + self._buffers[stream_name] += data + if len(self._buffers[stream_name]) >= self._maxbuf: + self.flush() + + return b'' + + def flush(self): + stdout = self._buffers['stdout'] + stderr = self._buffers['stderr'] + self._reporter.report(None, stdout, stderr) + self._buffers['stdout'] = b'' + self._buffers['stderr'] = b'' + + +class Mounter: # pragma: no cover + + def __init__(self, mounts, runner): + self._mounts = mounts + self._runner = runner + + def __enter__(self): + self.mount() + return self + + def __exit__(self, *args): + self.unmount() + + def mount(self): + for dirname, mp in self._mounts: + if not os.path.exists(mp): + os.mkdir(mp) + self._runner.runcmd(['sudo', 'mount', '--bind', dirname, mp]) + + def unmount(self): + for dirname, mp in reversed(self._mounts): + try: + self._runner.runcmd(['sudo', 'umount', mp]) + except BaseException as e: + logging.error( + 'Ignoring error while unmounting %s: %s', mp, str(e)) + + +class ActionEnvironment: # pragma: no cover + + def __init__(self, systree, workspace, reporter): + super().__init__() + self._systree = systree + self._workspace = workspace + self._reporter = reporter + + def get_systree_directory(self): + return self._systree + + def get_workspace_directory(self): + return self._workspace + + def get_mounts(self): + return [] + + def report(self, exit_code, msg): + self._reporter.report(exit_code, msg, None) + + def runcmd(self, argv): + raise NotImplementedError() + + def host_runcmd(self, *argvs, cwd=None): + env = self.get_env_vars() + runner = Runner(self._reporter) + mounts = self.get_mounts() + with Mounter(mounts, runner): + return runner.runcmd(*argvs, cwd=cwd, env=env) + + def get_env_vars(self): + env = dict(os.environ) + env.update({ + 'LC_ALL': 'C', + 'DEBIAN_FRONTEND': 'noninteractive', + }) + return env + + +class HostEnvironment(ActionEnvironment): + + def runcmd(self, argv): + return self.host_runcmd(argv, cwd=self._workspace) + + +class ChrootEnvironment(ActionEnvironment): + + def get_mounts(self): # pragma: no cover + return [ + ('/proc', os.path.join(self._workspace, 'proc')), + ('/sys', os.path.join(self._workspace, 'sys')), + ] + + def runcmd(self, argv): + prefix = ['sudo', 'chroot', self._workspace] + return self.host_runcmd(prefix + argv) + + +class ContainerEnvironment(ActionEnvironment): + + def runcmd(self, argv): + bind = '{}:/workspace'.format(self._workspace) + prefix = [ + 'sudo', 'systemd-nspawn', + '-D', self._systree, + '--bind', bind, + '--chdir', '/workspace', + ] + return self.host_runcmd(prefix + argv) diff --git a/ick2/actionenvs_tests.py b/ick2/actionenvs_tests.py new file mode 100644 index 0000000..4643527 --- /dev/null +++ b/ick2/actionenvs_tests.py @@ -0,0 +1,138 @@ +# Copyright (C) 2018 Lars Wirzenius +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + + +import unittest + + +import ick2 + + +class FakeReporter: + + def __init__(self): + self.stdout = b'' + self.stderr = b'' + + def report(self, exit_code, stdout, stderr): + self.stdout += stdout + self.stderr += stderr + + +class RunnerTests(unittest.TestCase): + + def test_runs_echo(self): + reporter = FakeReporter() + r = ick2.Runner(reporter) + exit_code = r.runcmd(['echo', 'hello', 'world']) + self.assertEqual(exit_code, 0) + self.assertEqual(reporter.stdout, b'hello world\n') + + def test_captures_lots_of_output(self): + reporter = FakeReporter() + r = ick2.Runner(reporter, maxbuf=1) + exit_code = r.runcmd(['echo', 'hello', 'world']) + self.assertEqual(exit_code, 0) + self.assertEqual(reporter.stdout, b'hello world\n') + + def test_captures_stderr(self): + reporter = FakeReporter() + r = ick2.Runner(reporter) + exit_code = r.runcmd(['sh', '-c', 'echo eek 1>&2 ; exit 2']) + self.assertEqual(exit_code, 2) + self.assertEqual(reporter.stdout, b'eek\n') + + +class EnvironmentTestsBase(unittest.TestCase): + + def setUp(self): + self.workspace = '/workspace' + self.fake = FakeRuncmd() + self.env = self.create_env() + self.env.host_runcmd = self.fake + + def create_env(self): + return DummyEnvironment(None, self.workspace, None) + + def expected_argv(self, argv): + return argv + + def test_runs_argv(self): + argv = ['echo', 'hello', 'world'] + self.fake.exit_code = 42 + exit_code = self.env.runcmd(argv) + self.assertEqual(self.fake.argv, self.expected_argv(argv)) + self.assertEqual(self.fake.exit_code, exit_code) + + +class DummyEnvironment(ick2.ActionEnvironment): + + def runcmd(self, argv): + return self.host_runcmd(argv) + + +class HostEnvironmentTests(EnvironmentTestsBase): + + def create_env(self): + return ick2.HostEnvironment(None, self.workspace, None) + + def expected_argv(self, argv): + return argv + + def test_pipeline(self): + reporter = FakeReporter() + env = ick2.HostEnvironment(None, None, reporter) + + echo = ['echo', 'hello', 'world'] + cat = ['cat'] + exit_code = env.host_runcmd(echo, cat) + self.assertEqual(exit_code, 0) + self.assertEqual(reporter.stdout, b'hello world\n') + self.assertEqual(reporter.stderr, b'') + + +class ChrootEnvironmentTests(EnvironmentTestsBase): + + def create_env(self): + return ick2.ChrootEnvironment(None, self.workspace, None) + + def expected_argv(self, argv): + return ['sudo', 'chroot', self.workspace] + argv + + +class ContainerEnvironmentTests(EnvironmentTestsBase): + + def create_env(self): + self.systree = 'systree' + self.workspace = 'workspace' + return ick2.ContainerEnvironment(self.systree, self.workspace, None) + + def expected_argv(self, argv): + prefix = [ + 'sudo', 'systemd-nspawn', '-D', self.systree, + '--bind', '{}:/workspace'.format(self.workspace), + '--chdir', '/workspace', + ] + return prefix + argv + + +class FakeRuncmd: + + def __init__(self): + self.argv = None + self.exit_code = None + + def __call__(self, argv, cwd=None): + self.argv = argv + return self.exit_code diff --git a/ick2/actions.py b/ick2/actions.py new file mode 100644 index 0000000..fef6889 --- /dev/null +++ b/ick2/actions.py @@ -0,0 +1,293 @@ +# Copyright (C) 2018 Lars Wirzenius +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + + +import base64 +import json +import logging +import os +import tempfile + + +import cliapp + + +import ick2 + + +class UnknownStepError(Exception): + + pass + + +class ActionFactory: + + _classes = { + 'host': ick2.HostEnvironment, + 'chroot': ick2.ChrootEnvironment, + 'container': ick2.ContainerEnvironment, + } + + def __init__(self, systree, workspace_area, reporter): + self._systree = systree + self._workspace_area = workspace_area + self._reporter = reporter + self._token = None + self._blob_url = None + + def set_token(self, token): + self._token = token + + def get_token(self): + return self._token + + def set_blob_url_func(self, func): + self._blob_url = func + + def get_blob_url_func(self): + return self._blob_url + + def get_allowed_environments(self): + return list(self._classes.keys()) + + def create_environment(self, spec, project_name): + env = spec.get('where', 'host') + assert env in self.get_allowed_environments() + env_class = self._classes[env] + area = ick2.WorkspaceArea() + area.set_root(self._workspace_area) + ws = area.create_workspace(project_name) + return env_class(self._systree, ws.get_directory(), self._reporter) + + def create_action(self, spec, project_name): + env = self.create_environment(spec, project_name) + action = self._create_action_object(env, spec) + action.set_token(self.get_token()) + action.set_blob_url_func(self.get_blob_url_func()) + return action + + def _create_action_object(self, env, spec): + if 'shell' in spec: + return ShellAction(env) + if 'python' in spec: + return PythonAction(env) + if 'debootstrap' in spec: + return DebootstrapAction(env) + if 'archive' in spec: + return ArchiveWorkspaceAction(env) + if spec.get('action') == 'populate_systree': + return PopulateSystreeAction(env) + if spec.get('action') == 'create_workspace': + return CreateWorkspaceAction(env) + raise UnknownStepError('Unknown action %r' % spec) + + +class Action: # pragma: no cover + + def __init__(self, env): + self._env = env + self._token = None + self._blob_url = None + + def set_token(self, token): + self._token = token + + def get_token(self): + return self._token + + def set_blob_url_func(self, func): + self._blob_url = func + + def get_blob_upload_url(self, blob_name): + return self._blob_url(blob_name) + + def get_env(self): + return self._env + + def get_workspace_area(self): + env = self.get_env() + area = ick2.WorkspaceArea() + area.set_root(env.get_workspace_directory()) + return area + + def encode_parameters(self, parsms): + raise NotImplementedError() + + def encode64(self, params): + assert isinstance(params, dict) + as_text = json.dumps(params) + as_bytes = as_text.encode('UTF-8') + as_base64 = base64.b64encode(as_bytes) + return as_base64.decode('UTF-8') + + def decode64(self, encoded): + as_base64 = encoded.encode('UTF-8') + as_bytes = base64.b64decode(as_base64) + as_text = as_bytes.decode('UTF-8') + return json.loads(as_text) + + def get_authz_headers(self): + token = self.get_token() + return { + 'Authorization': 'Bearer {}'.format(token), + } + + def execute(self, params, work): + raise NotImplementedError() + + +class ShellAction(Action): + + def encode_parameters(self, params): # pragma: no cover + encoded = self.encode64(params) + return 'params() { echo -n "%s" | base64 -d; }\n' % encoded + + def execute(self, params, step): + prefix = self.encode_parameters(params) + snippet = step['shell'] + argv = ['bash', '-exuc', prefix + snippet] + exit_code = self._env.runcmd(argv) + self._env.report(exit_code, 'action finished\n') + return exit_code + + +class PythonAction(Action): + + def encode_parameters(self, params): # pragma: no cover + encoded = self.encode64(params) + prefix = ( + 'import base64, json\n' + 'params = json.loads(base64.b64decode(\n' + ' "{}").decode("utf8"))\n' + ).format(encoded) + return prefix + + def execute(self, params, step): + prefix = self.encode_parameters(params) + snippet = step['python'] + argv = ['python3', '-c', prefix + '\n' + snippet] + exit_code = self._env.runcmd(argv) + self._env.report(exit_code, 'action finished\n') + return exit_code + + +class DebootstrapAction(Action): + + default_mirror = 'http://deb.debian.org/debian' + + def encode_parameters(self, params): # pragma: no cover + pass + + def execute(self, params, step): + suite = step.get('debootstrap') + if suite is None or suite == 'auto': + suite = params['debian_codename'] + mirror = step.get('mirror', self.default_mirror) + + env = self.get_env() + workspace = env.get_workspace_directory() + argv = ['sudo', 'debootstrap', suite, '.', mirror] + exit_code = self._env.host_runcmd(argv, cwd=workspace) + self._env.report(exit_code, 'action finished\n') + return exit_code + + +class CreateWorkspaceAction(Action): + + def encode_parameters(self, params): # pragma: no cover + pass + + def execute(self, params, step): + logging.debug('CreateWorkspaceAction: params=%r', params) + logging.debug('CreateWorkspaceAction: step=%r', step) + + env = self.get_env() + workspace = env.get_workspace_directory() + self._env.report(0, 'Created workspace %s\n' % workspace) + return 0 + + +class ArchiveWorkspaceAction(Action): # pragma: no cover + + def encode_parameters(self, params): + pass + + def execute(self, params, step): + blob_name = params.get('systree_name', 'noname') + + env = self.get_env() + dirname = env.get_workspace_directory() + + url = self.get_blob_upload_url(blob_name) + headers = self.get_authz_headers() + + logging.debug('ArchiveWorkspaceAction: url=%r', url) + logging.debug('ArchiveWorkspaceAction: headers=%r', headers) + + assert url is not None + assert headers is not None + + fd, tarball = tempfile.mkstemp() + os.close(fd) + tar = ['sudo', 'tar', '-zcf', tarball, '-C', dirname, '.'] + exit_code = self._env.host_runcmd(tar) + if exit_code != 0: + self._env.report(exit_code, 'tarball generation finished\n') + os.remove(tarball) + return exit_code + self._env.report(None, 'tarball generation finished\n') + + curl = ['curl', '-sk', '-T', tarball] + [ + '-H{}:{}'.format(name, value) + for name, value in headers.items() + ] + [url] + exit_code = self._env.host_runcmd(curl) + self._env.report( + exit_code, 'curl upload finished (exit code %s)\n' % exit_code) + os.remove(tarball) + return exit_code + + +class PopulateSystreeAction(Action): # pragma: no cover + + def encode_parameters(self, params): + pass + + def execute(self, params, step): + systree_name = step.get('systree_name') + if not systree_name: + return 1 + + env = self.get_env() + systree_dir = env.get_systree_directory() + self.make_directory_empty(systree_dir) + return self.download_and_unpack_systree(systree_name, systree_dir) + + def make_directory_empty(self, dirname): + return cliapp.runcmd( + ['sudo', 'find', dirname, '-mindepth', '1', '-delete']) + + def download_and_unpack_systree(self, systree_name, dirname): + url = self.get_blob_upload_url(systree_name) + headers = self.get_authz_headers() + curl = ['curl', '-sk'] + [ + '-H{}:{}'.format(name, value) + for name, value in headers.items() + ] + [url] + + untar = ['sudo', 'tar', '-zxf', '-', '-C', dirname] + + exit_code = self._env.host_runcmd(curl, untar) + self._env.report(exit_code, 'action finished\n') + return exit_code diff --git a/ick2/actions_tests.py b/ick2/actions_tests.py new file mode 100644 index 0000000..8220fe2 --- /dev/null +++ b/ick2/actions_tests.py @@ -0,0 +1,230 @@ +# Copyright (C) 2018 Lars Wirzenius +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + + +import os +import shutil +import tempfile +import unittest + + +import ick2 + + +class ActionFactoryTests(unittest.TestCase): + + def setUp(self): + self.tempdir = tempfile.mkdtemp() + self.workspace_area = self.tempdir + self.project = 'magic' + self.af = ick2.ActionFactory('systree', self.workspace_area, None) + + def tearDown(self): + shutil.rmtree(self.tempdir) + + def test_sets_token(self): + token = 'this is a token' + self.af.set_token(token) + self.assertEqual(self.af.get_token(), token) + + def test_lists_allows_environments(self): + allowed = self.af.get_allowed_environments() + self.assertTrue(isinstance(allowed, list)) + self.assertNotEqual(allowed, []) + + def test_creates_host_environment_by_default(self): + spec = { + 'shell': 'echo hello world', + } + env = self.af.create_environment(spec, self.project) + self.assertTrue(isinstance(env, ick2.HostEnvironment)) + + self.assertEqual( + env.get_workspace_directory(), + os.path.join(self.workspace_area, self.project)) + + def test_creates_host_environment_when_asked_explicitly(self): + spec = { + 'where': 'host', + } + env = self.af.create_environment(spec, self.project) + self.assertTrue(isinstance(env, ick2.HostEnvironment)) + + def test_creates_chroot_environment_by_default(self): + spec = { + 'where': 'chroot', + } + env = self.af.create_environment(spec, self.project) + self.assertTrue(isinstance(env, ick2.ChrootEnvironment)) + + def test_creates_container_environment_by_default(self): + spec = { + 'where': 'container', + } + env = self.af.create_environment(spec, self.project) + self.assertTrue(isinstance(env, ick2.ContainerEnvironment)) + + def test_creates_shell_action_on_host(self): + def url(blob_name): + return blob_name + + spec = { + 'shell': 'echo hello, world', + } + token = 'secret token' + self.af.set_token(token) + self.af.set_blob_url_func(url) + action = self.af.create_action(spec, self.project) + self.assertTrue(isinstance(action, ick2.ShellAction)) + self.assertTrue(isinstance(action.get_env(), ick2.HostEnvironment)) + self.assertEqual(action.get_token(), token) + self.assertEqual(action.get_blob_upload_url('foo'), 'foo') + + def test_creates_python_action_on_host(self): + spec = { + 'python': 'pass', + } + action = self.af.create_action(spec, self.project) + self.assertTrue(isinstance(action, ick2.PythonAction)) + self.assertTrue(isinstance(action.get_env(), ick2.HostEnvironment)) + + def test_creates_debootstrap_action_on_host(self): + spec = { + 'debootstrap': 'auto', + } + action = self.af.create_action(spec, self.project) + self.assertTrue(isinstance(action, ick2.DebootstrapAction)) + self.assertTrue(isinstance(action.get_env(), ick2.HostEnvironment)) + + def test_creates_create_workspace_action_on_host(self): + spec = { + 'action': 'create_workspace', + } + action = self.af.create_action(spec, self.project) + self.assertTrue(isinstance(action, ick2.CreateWorkspaceAction)) + self.assertTrue(isinstance(action.get_env(), ick2.HostEnvironment)) + + def test_creates_archive_workspace_action_on_host(self): + spec = { + 'archive': 'workspace', + } + action = self.af.create_action(spec, self.project) + self.assertTrue(isinstance(action, ick2.ArchiveWorkspaceAction)) + self.assertTrue(isinstance(action.get_env(), ick2.HostEnvironment)) + + def test_creates_populate_sysgtre_action_on_host(self): + spec = { + 'action': 'populate_systree', + } + action = self.af.create_action(spec, self.project) + self.assertTrue(isinstance(action, ick2.PopulateSystreeAction)) + self.assertTrue(isinstance(action.get_env(), ick2.HostEnvironment)) + + def test_raises_exception_for_unknown_step(self): + with self.assertRaises(ick2.UnknownStepError): + self.af.create_action({}, self.project) + + +class ShellActionTests(unittest.TestCase): + + def test_encodes_parameters(self): + params = { + 'foo': 'bar', + } + action = ick2.ShellAction(None) + encoded = action.encode64(params) + decoded = action.decode64(encoded) + self.assertEqual(params, decoded) + + def test_runs_argv(self): + params = {} + step = { + 'shell': 'echo hello, world', + } + env = DummyEnvironment() + env.exit_code = 42 + action = ick2.ShellAction(env) + exit_code = action.execute(params, step) + self.assertEqual(exit_code, env.exit_code) + + +class PythonActionTests(unittest.TestCase): + + def test_runs_argv(self): + params = {} + step = { + 'python': 'pass', + } + env = DummyEnvironment() + env.exit_code = 42 + action = ick2.PythonAction(env) + exit_code = action.execute(params, step) + self.assertEqual(exit_code, env.exit_code) + + +class DebootstrapActionTests(unittest.TestCase): + + def test_runs_argv(self): + params = { + 'debian_codename': 'stretch', + } + step = { + 'debootstrap': 'auto', + 'mirror': 'http://deb.debian.org/debian', + } + env = DummyEnvironment() + env.exit_code = 42 + action = ick2.DebootstrapAction(env) + exit_code = action.execute(params, step) + self.assertEqual(exit_code, env.exit_code) + + +class CreateWorkspaceActionTests(unittest.TestCase): + + def setUp(self): + self.tempdir = tempfile.mkdtemp() + + def tearDown(self): + shutil.rmtree(self.tempdir) + + def test_runs_argv(self): + params = {} + step = { + 'action': 'create_workspace', + } + env = DummyEnvironment(tempdir=self.tempdir) + action = ick2.CreateWorkspaceAction(env) + exit_code = action.execute(params, step) + self.assertEqual(exit_code, 0) + + +class DummyEnvironment: + + def __init__(self, tempdir=None): + self.exit_code = None + self.argv = None + self.tempdir = tempdir or tempfile.mkdtemp() + + def report(self, exit_code, msg): + pass + + def host_runcmd(self, argv, **kwargs): + self.argv = argv + return self.exit_code + + def runcmd(self, argv, **kwargs): + return self.host_runcmd(argv) + + def get_workspace_directory(self): + return self.tempdir diff --git a/ick2/client.py b/ick2/client.py new file mode 100644 index 0000000..99b8487 --- /dev/null +++ b/ick2/client.py @@ -0,0 +1,238 @@ +# Copyright (C) 2018 Lars Wirzenius +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + + +import json +import logging +import time + + +import requests + + +class HttpError(Exception): + + pass + + +class HttpAPI: + + # Make requests to an HTTP API. + + json_type = 'application/json' + + def __init__(self): + self._session = requests.Session() + self._token = None + + def set_session(self, session): + self._session = session + + def set_token(self, token): + self._token = token + + def get_dict(self, url, headers=None): + r = self._request(self._session.get, url, headers=headers) + ct = r.headers.get('Content-Type') + if ct != self.json_type: + raise HttpError('Not JSON response') + try: + return r.json() + except json.decoder.JSONDecodeError: + raise HttpError('JSON parsing error') + + def get_blob(self, url, headers=None): + r = self._request(self._session.get, url, headers=headers) + return r.content + + def post(self, url, headers=None, body=None): + self._send_request(self._session.post, url, headers=headers, body=body) + return None + + def put(self, url, headers=None, body=None): + self._send_request(self._session.put, url, headers=headers, body=body) + return None + + def _send_request(self, func, url, headers=None, body=None): + if headers is None: + headers = {} + headers = dict(headers) + h, body = self._get_content_type_header(body) + headers.update(h) + self._request(func, url, headers=headers, data=body) + return None + + def _get_content_type_header(self, body): + if isinstance(body, dict): + header = { + 'Content-Type': 'application/json', + } + body = json.dumps(body) + return header, body + return {}, body + + def _get_authorization_headers(self): + return { + 'Authorization': 'Bearer {}'.format(self._token), + } + + def _request(self, func, url, headers=None, **kwargs): + if headers is None: + headers = {} + headers.update(self._get_authorization_headers()) + + logging.debug('request: func=%r', func) + logging.debug('request: url=%r', url) + for h in headers: + logging.debug('request: %s: %s', h, headers[h]) + logging.debug('request: kwargs=%r', kwargs) + + r = func(url, headers=headers, verify=False, **kwargs) + logging.debug('response: status_code=%r', r.status_code) + logging.debug('response: content=%r', r.content) + + if not r.ok: + raise HttpError(r.status_code) + return r + + +class ControllerClient: + + def __init__(self): + self._name = None + self._api = HttpAPI() + self._url = None + + def set_client_name(self, name): + self._name = name + + def set_http_api(self, api): + self._api = api + + def set_controller_url(self, url): + self._url = url + + def set_token(self, token): + self._api.set_token(token) + + def url(self, path): + return '{}{}'.format(self._url, path) + + def get_artifact_store_url(self): + url = self.url('/version') + version = self._api.get_dict(url) + url = version.get('artifact_store') + logging.info('Artifact store URL: %r', url) + return url + + def get_blob_client(self): + url = self.get_artifact_store_url() + blobs = BlobClient() + blobs.set_url(url) + blobs.set_http_api(self._api) + return blobs + + def register(self): + assert self._url is not None + url = self.url('/workers') + logging.info('Registering worker %s to %s', self._name, url) + body = { + 'worker': self._name, + } + try: + self._api.post(url, body=body) + except HttpError: + pass + + def get_work(self): + url = self.url('/work/{}'.format(self._name)) + work = self._api.get_dict(url) + logging.info('Requested work, got: %r', work) + return work + + def report_work(self, work): + logging.info('Reporting work: %r', work) + url = self.url('/work') + headers = { + 'Content-Type': self._api.json_type, + } + body = json.dumps(work) + self._api.post(url, headers=headers, body=body) + + +class Reporter: # pragma: no cover + + def __init__(self, api, work): + self._api = api + self._work = dict(work) + + def report(self, exit_code, stdout, stderr): + result = dict(self._work) + result['exit_code'] = exit_code + result['stdout'] = self.make_string(stdout) + result['stderr'] = self.make_string(stderr) + for key in result: + logging.debug('Reporter: result[%r]=%r', key, result[key]) + self._api.report_work(result) + + def make_string(self, thing): + if isinstance(thing, bytes): + return thing.decode('UTF-8') + if isinstance(thing, str): + return thing + if thing is None: + return thing + return repr(thing) + + def get_work_result(self, work): + return { + 'build_id': work['build_id'], + 'worker': work['worker'], + 'project': work['project'], + 'pipeline': work['pipeline'], + 'stdout': '', + 'stderr': '', + 'exit_code': None, + 'timestamp': self.now(), + } + + def now(self): + return time.strftime('%Y-%m-%dT%H:%M:%S') + + +class BlobClient: + + def __init__(self): + self._url = None + self._api = None + + def set_url(self, url): + self._url = url + + def set_http_api(self, api): + self._api = api + + def url(self, blob_name): + assert self._url is not None + return '{}/blobs/{}'.format(self._url, blob_name) + + def download(self, blob_name): + logging.info('Download blob %s', blob_name) + url = self.url(blob_name) + return self._api.get_blob(url) + + def upload(self, blob_name, blob): + logging.info('Upload blob %s', blob_name) + url = self.url(blob_name) + return self._api.put(url, body=blob) diff --git a/ick2/client_tests.py b/ick2/client_tests.py new file mode 100644 index 0000000..92da36a --- /dev/null +++ b/ick2/client_tests.py @@ -0,0 +1,274 @@ +# Copyright (C) 2018 Lars Wirzenius +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + + +import json +import unittest + + +import ick2 + + +json_type = 'application/json' + + +class HttpAPITests(unittest.TestCase): + + def setUp(self): + self.session = FakeHttpSession() + self.session.token = 'SECRET-TOKEN' + self.client = ick2.HttpAPI() + self.client.set_session(self.session) + self.client.set_token(self.session.token) + + def test_get_dict_raises_exception_on_error(self): + self.session.response = FakeResponse(400) + with self.assertRaises(ick2.HttpError): + self.client.get_dict('http://controller/version') + + def test_get_dict_raises_exception_on_not_json(self): + self.session.response = FakeResponse(200, body=json.dumps('{}')) + with self.assertRaises(ick2.HttpError): + self.client.get_dict('http://controller/version') + + def test_get_dict_raises_exception_on_malformed_json(self): + self.session.response = FakeResponse( + 200, body='this is not really JSON', content_type=json_type) + with self.assertRaises(ick2.HttpError): + self.client.get_dict('http://controller/version') + + def test_get_dict_returns_response(self): + version = { + 'version': '1.0', + } + + self.session.response = FakeResponse( + 200, body=json.dumps(version), content_type=json_type) + obj = self.client.get_dict('http://controller/version') + self.assertEqual(obj, version) + + def test_get_blob_raises_exception_on_error(self): + self.session.response = FakeResponse(404) + with self.assertRaises(ick2.HttpError): + self.client.get_blob('http://blobs/foo') + + def test_get_blob_returns_response(self): + blob = b'hello, world\n' + self.session.response = FakeResponse(200, body=blob) + obj = self.client.get_blob('http://blobs/foo') + self.assertEqual(obj, blob) + + def test_post_raises_exception_on_error(self): + self.session.response = FakeResponse(400) + with self.assertRaises(ick2.HttpError): + self.client.post('http://controller/work', body='') + + def test_post_succeeds(self): + work = { + 'project': 'foo', + 'stdout': 'hello, world\n', + } + + self.session.response = FakeResponse( + 201, body=json.dumps(work), content_type=json_type) + obj = self.client.post('http://controller/work', body=work) + self.assertEqual(obj, None) + + def test_put_raises_exception_on_error(self): + blob = b'fooblob' + self.session.response = FakeResponse(400) + with self.assertRaises(ick2.HttpError): + self.client.put('http://blobs/foo', body=blob) + + def test_put_succeeds(self): + blob = b'fooblob' + self.session.response = FakeResponse(201, body=None) + obj = self.client.put('http://controller/work', body=blob) + self.assertEqual(obj, None) + + +class ControllerClientTests(unittest.TestCase): + + def setUp(self): + self.session = FakeHttpSession() + self.session.token = 'SECRET-TOKEN' + + self.client = ick2.HttpAPI() + self.client.set_session(self.session) + self.client.set_token(self.session.token) + + self.controller = ick2.ControllerClient() + self.controller.set_http_api(self.client) + self.controller.set_controller_url('https://controller') + self.controller.set_token('SECRET-TOKEN') + self.controller.set_client_name('asterix') + + def test_register_succeeds_on_error(self): + self.session.response = FakeResponse(400) + self.assertEqual(self.controller.register(), None) + + def test_register_succeeds(self): + self.session.response = FakeResponse(200) + self.assertEqual(self.controller.register(), None) + + def test_get_work_raises_exception_on_error(self): + self.session.response = FakeResponse(400) + with self.assertRaises(ick2.HttpError): + self.controller.get_work() + + def test_get_work_succeeds(self): + work = { + 'build_id': 'rome/1' + } + self.session.response = FakeResponse( + 200, body=json.dumps(work), content_type=json_type) + self.assertEqual(self.controller.get_work(), work) + + def test_report_work_raises_exception_on_error(self): + work = { + 'stdout': 'hello, world', + } + self.session.response = FakeResponse(400) + with self.assertRaises(ick2.HttpError): + self.controller.report_work(work) + + def test_report_work_succeeds(self): + work = { + 'stdout': 'hello, world', + } + self.session.response = FakeResponse(200) + self.assertEqual(self.controller.report_work(work), None) + + def test_get_artifact_store_url_raises_exception_on_error(self): + self.session.response = FakeResponse(400) + with self.assertRaises(ick2.HttpError): + self.controller.get_artifact_store_url() + + def test_get_artifact_store_url_succeeds(self): + url = 'https://blobs' + version = { + 'artifact_store': url, + } + self.session.response = FakeResponse( + 200, body=json.dumps(version), content_type=json_type) + self.assertEqual(self.controller.get_artifact_store_url(), url) + + +class BlobServiceClientTests(unittest.TestCase): + + def setUp(self): + self.session = FakeHttpSession() + self.session.token = 'SECRET-TOKEN' + + self.client = ick2.HttpAPI() + self.client.set_session(self.session) + self.client.set_token(self.session.token) + + self.controller = ick2.ControllerClient() + self.controller.set_http_api(self.client) + self.controller.set_controller_url('https://controller') + self.controller.set_token('SECRET-TOKEN') + self.controller.set_client_name('asterix') + + def get_blob_client(self): + url = 'https://blobs' + version = { + 'artifact_store': url, + } + self.session.response = FakeResponse( + 200, body=json.dumps(version), content_type=json_type) + + return self.controller.get_blob_client() + + def test_get_blob_client_raises_exception_on_error(self): + self.session.response = FakeResponse(400) + with self.assertRaises(ick2.HttpError): + self.controller.get_blob_client() + + def test_get_blob_client_succeeds(self): + blobs = self.get_blob_client() + self.assertTrue(isinstance(blobs, ick2.BlobClient)) + + def test_download_raises_exception_on_error(self): + blobs = self.get_blob_client() + self.session.response = FakeResponse(400) + with self.assertRaises(ick2.HttpError): + blobs.download('foo') + + def test_download_succeeds(self): + blobs = self.get_blob_client() + blob = b'hello, world' + self.session.response = FakeResponse(200, body=blob) + self.assertEqual(blobs.download('foo'), blob) + + def test_upload_raises_exception_on_error(self): + blobs = self.get_blob_client() + blob = b'hello, world' + self.session.response = FakeResponse(400) + with self.assertRaises(ick2.HttpError): + blobs.upload('foo', blob) + + def test_upload_succeeds(self): + blobs = self.get_blob_client() + blob = b'hello, world' + self.session.response = FakeResponse(200) + self.assertEqual(blobs.upload('foo', blob), None) + + +class FakeHttpSession: + + def __init__(self): + self.response = None + self.token = None + + def get(self, url, headers=None, verify=None): + assert self.response is not None + assert self.is_authorized(headers) + return self.response + + def post(self, url, headers=None, data=None, verify=None): + assert self.response is not None + assert self.is_authorized(headers) + return self.response + + def put(self, url, headers=None, data=None, verify=None): + assert self.response is not None + assert self.is_authorized(headers) + return self.response + + def is_authorized(self, headers): + assert self.token is not None + assert 'Authorization' in headers + v = headers['Authorization'] + return v == 'Bearer {}'.format(self.token) + + +class FakeResponse: + + def __init__(self, status_code, body=None, content_type=None): + if content_type is None: + content_type = 'application/octet-stream' + + self.status_code = status_code + self.headers = { + 'Content-Type': content_type, + } + self.content = body + + @property + def ok(self): + return self.status_code in (200, 201) + + def json(self): + return json.loads(self.content) diff --git a/ick2/workspace.py b/ick2/workspace.py new file mode 100644 index 0000000..38cf5e4 --- /dev/null +++ b/ick2/workspace.py @@ -0,0 +1,64 @@ +# Copyright (C) 2018 Lars Wirzenius +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + + +import os +import shutil + + +class WorkspaceArea: + + def __init__(self): + self._root = None + + def set_root(self, root): + self._root = root + + def get_root(self): + return self._root + + def clear(self): # pragma: no cover + names = os.listdir(self._root) + for name in names: + pathname = os.path.join(self._root, name) + if os.path.isdir(pathname): + shutil.rmtree(pathname) + else: + os.remove(pathname) + + def create_workspace(self, name): + dirname = self.get_workspace_directory(name) + if not os.path.exists(dirname): + os.mkdir(dirname) + ws = Workspace() + ws.set_directory(dirname) + return ws + + def get_workspace_directory(self, name): + return os.path.join(self._root, name) + + +class Workspace: + + def __init__(self): + self._dirname = None + + def set_directory(self, dirname): + self._dirname = dirname + + def get_directory(self): + return self._dirname + + def destroy(self): + shutil.rmtree(self.get_directory()) diff --git a/ick2/workspace_tests.py b/ick2/workspace_tests.py new file mode 100644 index 0000000..fa5e788 --- /dev/null +++ b/ick2/workspace_tests.py @@ -0,0 +1,68 @@ +# Copyright (C) 2018 Lars Wirzenius +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + + +import os +import shutil +import tempfile +import unittest + + +import ick2 + + +class WorkspaceAreaTests(unittest.TestCase): + + def setUp(self): + self.tempdir = tempfile.mkdtemp() + self.area = ick2.WorkspaceArea() + self.area.set_root(self.tempdir) + + def tearDown(self): + shutil.rmtree(self.tempdir) + + def test_sets_root(self): + self.assertEqual(self.area.get_root(), self.tempdir) + + def test_clears_workspace_area(self): + name = 'foo-project' + ws = self.area.create_workspace(name) + self.area.clear() + self.assertEqual(os.listdir(self.tempdir), []) + + def test_creates_workspace(self): + name = 'foo-project' + ws = self.area.create_workspace(name) + self.assertTrue(isinstance(ws, ick2.Workspace)) + dirname = os.path.join(self.tempdir, name) + self.assertEqual(ws.get_directory(), dirname) + self.assertTrue(os.path.isdir(dirname)) + + def test_creates_workspace_twice(self): + name = 'foo-project' + ws0 = self.area.create_workspace(name) + ws = self.area.create_workspace(name) + self.assertTrue(isinstance(ws, ick2.Workspace)) + dirname = os.path.join(self.tempdir, name) + self.assertEqual(ws.get_directory(), dirname) + self.assertTrue(os.path.isdir(dirname)) + + def test_destroys_nonempty_workspace(self): + name = 'foo-project' + ws = self.area.create_workspace(name) + dirname = ws.get_directory() + with open(os.path.join(dirname, 'file.txt'), 'w') as f: + f.write('hello, world') + ws.destroy() + self.assertFalse(os.path.exists(dirname)) diff --git a/worker_manager b/worker_manager index 7530469..ad01fd7 100755 --- a/worker_manager +++ b/worker_manager @@ -15,18 +15,12 @@ # along with this program. If not, see <http://www.gnu.org/licenses/>. -import base64 -import json import logging -import os -import subprocess -import tempfile import time import apifw import cliapp import Crypto.PublicKey.RSA -import requests import urllib3 import ick2 @@ -89,16 +83,25 @@ class WorkerManager(cliapp.Application): ) def process_args(self, args): + try: + self.main_program(args) + except BaseException as e: + logging.error(str(e), exc_info=True) + raise + + def main_program(self, args): self.settings.require('name') self.settings.require('controller') name = self.settings['name'] url = self.settings['controller'] + workspace = self.settings['workspace'] + systree = self.settings['systree'] + tg = TokenGenerator() tg.set_key(self.settings['token-key']) api = ControllerAPI(name, url, tg) - worker = Worker( - name, api, self.settings['workspace'], self.settings['systree']) + worker = Worker(name, api, workspace, systree) logging.info('Worker manager %s starts, controller is %s', name, url) @@ -118,106 +121,43 @@ class WorkerManager(cliapp.Application): class ControllerAPI: def __init__(self, name, url, token_generator): - self._name = name - self._url = url - self._blob_url = None self._token_generator = token_generator - self._httpapi = HttpApi() + self._cc = ick2.ControllerClient() + self._cc.set_client_name(name) + self._cc.set_controller_url(url) + self._blobs = None + + def get_token(self): + return self._token_generator.get_token() def register(self): - logging.info('Registering worker %s to %s', self._name, self._url) - url = self.url('/workers') - headers = self.get_auth_headers() - body = { - 'worker': self._name, - } - code = self._httpapi.post(url, headers, body) - if code not in [200, 201, 409]: - raise cliapp.AppException('Failed to register worker') + self._cc.set_token(self.get_token()) + self._cc.register() def get_work(self): - url = self.url('/work/{}'.format(self._name)) - headers = self.get_auth_headers() - work = self._httpapi.get(url, headers) - if work: - logging.info('Response: %r', work) - return work + self._cc.set_token(self.get_token()) + return self._cc.get_work() def report_work(self, work): - logging.info('POST %s', work) - url = self.url('/work') - headers = self.get_auth_headers() - code = self._httpapi.post(url, headers, work) - if code not in (200, 201): - raise cliapp.AppException( - 'Error posting data to controller: {}'.format(code)) - - def url(self, path): - return '{}{}'.format(self._url, path) - - def get_auth_headers(self): - token = self._token_generator.get_token() - return { - 'Authorization': 'Bearer {}'.format(token), - } + self._cc.set_token(self.get_token()) + self._cc.report_work(work) + + def get_blob_upload_url(self, name): + blobs = self.get_blob_client() + return blobs.url(name) def upload_blob(self, blob_id, blob): - logging.info('Upload blob %s', blob_id) - url = self.bloburl(blob_id) - headers = self.get_auth_headers() - code = self._httpapi.put(url, headers, blob) + blobs = self.get_blob_client() + blobs.upload(blob_id, blob) def download_blob(self, blob_id): - logging.info('Download blob %s', blob_id) - url = self.bloburl(blob_id) - headers = self.get_auth_headers() - return self._httpapi.get_blob(url, headers) - - def bloburl(self, blob_id): - if self._blob_url is None: - self._blob_url = self.get_artifact_store_url() - if self._blob_url is not None: - return '{}/blobs/{}'.format(self._blob_url, blob_id) - logging.error('Do not artifact store URL') - return None - - def get_artifact_store_url(self): - url = self.url('/version') - headers = self.get_auth_headers() - version = self._httpapi.get(url, headers) - logging.info('Version: %r', version) - if version: - return version.get('artifact_store') - - -class HttpApi: - - def __init__(self): - self._session = requests.Session() - - def post(self, url, headers, body): - r = self._session.post(url, json=body, headers=headers, verify=False) - if not r.ok: - logging.warning('Error: POST %s returned %s', url, r.status_code) - return r.status_code + blobs = self.get_blob_client() + return blobs.download(blob_id) - def put(self, url, headers, body): - r = self._session.put(url, data=body, headers=headers, verify=False) - if not r.ok: - logging.warning('Error: PUT %s returned %s', url, r.status_code) - return r.status_code - - def get(self, url, headers): - r = self._session.get(url, headers=headers, verify=False) - if not r.ok or not r.text: - return None - return r.json() - - def get_blob(self, url, headers): - r = self._session.get(url, headers=headers, verify=False) - if not r.ok: - return None - return r.content + def get_blob_client(self): + if self._blobs is None: + self._blobs = self._cc.get_blob_client() + return self._blobs class TokenGenerator: @@ -290,353 +230,23 @@ class Worker: self._systree = systree def do_work(self, work): - - def post(stream_name, data): - s = self.get_work_result(work) - s[stream_name] = data - self._api.report_work(s) - - step = work['step'] - logging.info('Running step: %r', step) - exit_code = 0 - if exit_code == 0: - klass = self.worker_factory(step) - if klass is None: - exit_code = -1 - else: - worker = klass(self._api, self._workspace, self._systree, post) - exit_code = worker.do(work) - self.finish_work(work, exit_code) - - def get_work_result(self, work): - return { - 'build_id': work['build_id'], - 'worker': self._name, - 'project': work['project'], - 'pipeline': work['pipeline'], - 'stdout': '', - 'stderr': '', - 'exit_code': None, - 'timestamp': self.now(), - } - - def now(self): - return time.strftime('%Y-%m-%dT%H:%M:%S') - - def worker_factory(self, step): - table = [ - ('shell', None, ShellWorker), - ('python', None, PythonWorker), - ('debootstrap', None, DebootstrapWorker), - ('archive', None, WorkspaceArchiver), - ('action', 'populate_systree', SystreePopulator), - ('action', 'create_workspace', WorkspaceCleaner), - ] - - for key, value, klass in table: - if key in step and (value is None or step[key] == value): - return klass - - logging.warning('Cannot find worker for %s', step) - return None - - def finish_work(self, work, exit_code): - s = self.get_work_result(work) - s['exit_code'] = exit_code - self._api.report_work(s) - - -class Runner: - - def __init__(self, post_output): - self._post = post_output - self._buffers = {'stdout': '', 'stderr': ''} - self._maxbuf = 1024 * 256 - self._timeout = 1.0 - - def runcmd(self, argv, **kwargs): - logging.debug('Runner.runcmd: argv=%r %r', argv, kwargs) - exit_code, _, _ = cliapp.runcmd_unchecked( - argv, - stdout_callback=self.capture_stdout, - stderr=subprocess.STDOUT, - output_timeout=self._timeout, - timeout_callback=self.flush, - **kwargs - ) - self.flush() - logging.debug('Runner.runcmd: finished, exit_code=%r', exit_code) - return exit_code - - def capture_stdout(self, data): - return self.capture('stdout', data) - - def capture_stderr(self, data): - return self.capture('stderr', data) - - def capture(self, stream_name, data): - logging.debug('CAPTURE %s: %r', stream_name, data) - self._buffers[stream_name] += data.decode('UTF-8') - if len(self._buffers[stream_name]) >= self._maxbuf: - self.flush() - return b'' - - def flush(self): - logging.debug('flushing: self._buffers=%r', self._buffers) - streams = list(self._buffers.keys()) - logging.debug('streams: %r', streams) - for stream in streams: - buf = self._buffers[stream] - logging.debug('FLUSH: %s: %r', stream, buf) - self._buffers[stream] = '' - if buf: - logging.debug('Posting %d bytes of %s', len(buf), stream) - self._post(stream, buf) - - -class Mounter: - - def __init__(self, mounts, runner): - self._mounts = mounts - self._runner = runner - - def __enter__(self): - self.mount() - return self - - def __exit__(self, *args): - self.unmount() - - def mount(self): - for dirname, mp in self._mounts: - if not os.path.exists(mp): - os.mkdir(mp) - self._runner.runcmd(['sudo', 'mount', '--bind', dirname, mp]) - - def unmount(self): - for dirname, mp in reversed(self._mounts): - self._runner.runcmd(['sudo', 'umount', mp]) - - -class WorkerBase: - - def __init__(self, api, workspace, systree, post): - self._api = api - self._workspace = workspace - self._systree = systree - self._post = post - - def do(self, work): - params = work.get('parameters', {}) - params_text = self.params64(params) - argv = self.get_argv(work, params_text) - mounts = [] - env = dict(os.environ) - env.update({ - 'LC_ALL': 'C', - 'DEBIAN_FRONTEND': 'noninteractive', - }) - if self.where(work) == 'chroot': - logging.debug('CHROOT REQUESTED') - argv = ['sudo', 'chroot', self._workspace] + argv - mounts = [ - ('/proc', os.path.join(self._workspace, 'proc')), - ('/sys', os.path.join(self._workspace, 'sys')), - ] - elif self.where(work) == 'container': - logging.debug('CONTAINER REQUESTED') - bind = '{}:/workspace'.format(self._workspace) - mp = '{}/workspace'.format(self._workspace) - if not os.path.exists(mp): - os.mkdir(mp) - argv = [ - 'sudo', 'systemd-nspawn', '-D', self._systree, '--bind', bind, - '--chdir', '/workspace', - ] + argv - else: - logging.debug('HOST REQUESTED') - runner = Runner(self._post) - with Mounter(mounts, runner): - return runner.runcmd(argv, cwd=self._workspace, env=env) - - def params64(self, params): - as_json = json.dumps(params) - as_bytes = as_json.encode('UTF-8') - as_base64 = base64.b64encode(as_bytes) - return as_base64.decode('UTF-8') - - def where(self, work): + logging.debug('Doing work: %r', work) + project_name = work['project'] step = work.get('step', {}) - return step.get('where') - - def get_argv(self, work, params_text): - raise NotImplementedError() - - -class ShellWorker(WorkerBase): - - def get_argv(self, work, params_text): - step = work['step'] - code_snippet = step['shell'] - where = step.get('where', 'host') - prefix = 'params() { echo -n "%s" | base64 -d; }\n' % params_text - return ['bash', '-exuc', prefix + code_snippet] - - -class PythonWorker(WorkerBase): - - def get_argv(self, work, params_text): - prefix = ( - 'import base64, json\n' - 'params = json.loads(base64.b64decode(\n' - ' "{}").decode("utf8"))\n' - ).format(params_text) - code_snippet = work['step']['python'] - return ['python3', '-c', prefix + '\n' + code_snippet] - - -class DebootstrapWorker(WorkerBase): - - def get_argv(self, work, params_text): - step = work['step'] - params = work.get('parameters', {}) - if step['debootstrap'] == 'auto': - suite = params['debian_codename'] - else: - suite = step['debootstrap'] - - return [ - 'sudo', - 'debootstrap', - suite, - '.', - step.get('mirror', 'http://deb.debian.org/debian'), - ] - - -class WorkspaceCleaner(WorkerBase): - - def get_argv(self, work, params_text): - return ['sudo', 'find', '.', '-delete'] - - -class WorkspaceArchiver(WorkerBase): - - def do(self, work): + logging.debug('Doing work: step=%r', step) params = work.get('parameters', {}) - blob_id = params.get('systree_name', 'noname') - step = work['step'] - dirname = step['archive'] - - if dirname == 'workspace': - dirname = self._workspace - - runner = Runner(self._post) - - temp = self.archive(runner, dirname) - if isinstance(temp, int): - return temp - - result = self.upload(runner, temp, blob_id) - os.remove(temp) - - return result - - def archive(self, runner, dirname): - logging.info('Archiving %s', dirname) - self.report(b'Archiving workspace\n') - - fd, filename = tempfile.mkstemp() - os.close(fd) - - archive = ['sudo', 'tar', '-zcf', filename, '-C', dirname, '.'] - exit_code = runner.runcmd(archive) - if exit_code != 0: - os.remove(filename) - return exit_code - - return filename - - def upload(self, runner, filename, blob_id): - logging.info('Uploading %s', blob_id) - self.report(b'Uploading workspace\n') - - url = self._api.bloburl(blob_id) - headers = self._api.get_auth_headers() - upload = ['curl', '-sk', '-T', filename] - for name, value in headers.items(): - header = '{}: {}'.format(name, value) - upload.extend(['-H', header]) - upload.append(url) - - exit_code = runner.runcmd(upload) - if exit_code == 0: - self.report(b'Uploaded tarball of workspace\n') - - return exit_code - - def execute_argv(self, *argvs, **kwargs): - exit_code, _, _ = cliapp.runcmd_unchecked( - *argvs, - **kwargs, - ) - return exit_code - - def report(self, data): - self._post('stdout', data.decode('UTF-8')) - return b'' - - def get_argv(self, work, params_text): - assert False - - -class SystreePopulator(WorkerBase): - - systree_dir = '/var/lib/ick/systree' - - def do(self, work): - step = work['step'] - systree_name = step.get('systree_name') - if not systree_name: - self.report( - b'No systree_name field in action, no systree population\n') - return 1 - - self.make_directory_empty(self.systree_dir) - - self.report(b'Downloading and unpacking systree blob\n') - return self.download_and_unpack_systree(systree_name, self.systree_dir) - - def make_directory_empty(self, dirname): - return self.execute_argv(['sudo', 'find', '-delete'], cwd=dirname) - - def download_and_unpack_systree(self, systree_name, dirname): - url = self._api.bloburl(systree_name) - headers = self._api.get_auth_headers() - download = ['curl', '-k'] - for name, value in headers.items(): - header = '{}: {}'.format(name, value) - download.extend(['-H', header]) - download.append(url) - - unpack = ['sudo', 'tar', '-zxf', '-', '-C', dirname] - - return self.execute_argv(download, unpack) - - def execute_argv(self, *argvs, **kwargs): - exit_code, _, _ = cliapp.runcmd_unchecked( - *argvs, - stdout_callback=self.report, - stderr_callback=self.report, - **kwargs, - ) - return exit_code - - def report(self, data): - self._post('stdout', data.decode('UTF-8')) - - def get_argv(self, work, params_text): - assert False + logging.debug('Doing work: params=%r', params) + reporter = ick2.Reporter(self._api, work) + logging.debug('Doing work: reporter=%r', reporter) + af = ick2.ActionFactory(self._systree, self._workspace, reporter) + logging.debug('Doing work: af=%r', af) + af.set_token(self._api.get_token()) + af.set_blob_url_func(self._api.get_blob_upload_url) + action = af.create_action(step, project_name) + logging.debug('Doing work: action=%r', action) + exit_code = action.execute(params, step) + logging.debug('Action finished: exit_code=%r', exit_code) + logging.debug('Finished work: %r', work) if __name__ == '__main__': |