summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLars Wirzenius <liw@liw.fi>2018-03-30 12:13:53 +0300
committerLars Wirzenius <liw@liw.fi>2018-03-30 12:13:53 +0300
commit5a93f67af48df039fac38aa064ec96834232e72e (patch)
tree1e95c895d08022e94937cceb9efc363c7b1c3575
parent3f8b808f63fa5b321e45644df2c83f8d75aa623d (diff)
parent17253c6e422b52988b5abef43c2b797a5d2a6e8d (diff)
downloadick2-5a93f67af48df039fac38aa064ec96834232e72e.tar.gz
Merge: rewrite worker manager
-rw-r--r--NEWS3
-rwxr-xr-xcheck35
-rw-r--r--ick2/__init__.py26
-rw-r--r--ick2/actionenvs.py167
-rw-r--r--ick2/actionenvs_tests.py138
-rw-r--r--ick2/actions.py293
-rw-r--r--ick2/actions_tests.py230
-rw-r--r--ick2/client.py238
-rw-r--r--ick2/client_tests.py274
-rw-r--r--ick2/workspace.py64
-rw-r--r--ick2/workspace_tests.py68
-rwxr-xr-xworker_manager492
12 files changed, 1573 insertions, 455 deletions
diff --git a/NEWS b/NEWS
index 718ea95..05c264d 100644
--- a/NEWS
+++ b/NEWS
@@ -25,6 +25,9 @@ Version 0.28+git, not yet released
* Artifact store can handle large artifacts now. Tested with 100
gigabytes.
+* The worker manager has been largely rewritten, with (unit) tests
+ added. The new code is more maintainable and more easily extensible.
+
Version 0.28, released 2018-02-24
----------------------------------
diff --git a/check b/check
index 9519fac..4ba5b42 100755
--- a/check
+++ b/check
@@ -33,23 +33,27 @@ title()
title Remote or local yarns?
remote=no
unit=yes
+yarns=yes
if [ "$#" -gt 0 ]
then
case "$1" in
https://*)
remote=yes
unit=no
+ yarns=yes
remote_url="$1"
shift 1
;;
yarns)
remote=no
unit=no
+ yarns=yes
shift 1
;;
local)
remote=no
unit=yes
+ yarns=no
shift 1
;;
*)
@@ -85,22 +89,25 @@ then
pylint3 --rcfile pylint.conf $python_sources
fi
-title Yarns
-if [ "$remote" = no ]
+if [ "$yarns" = yes ]
then
- impl=yarns/900-local.yarn
- args=""
-else
- impl=yarns/900-remote.yarn
- args="--env ICK_URL=$remote_url"
+ title Yarns
+ if [ "$remote" = no ]
+ then
+ impl=yarns/900-local.yarn
+ args=""
+ else
+ impl=yarns/900-remote.yarn
+ args="--env ICK_URL=$remote_url"
+ fi
+ yarn yarns/[^9]*.yarn yarns/900-implements.yarn "$impl" \
+ --shell python2 \
+ --shell-arg '' \
+ --shell-library yarns/lib.py \
+ --cd-datadir \
+ $args \
+ "$@"
fi
-yarn yarns/[^9]*.yarn yarns/900-implements.yarn "$impl" \
- --shell python2 \
- --shell-arg '' \
- --shell-library yarns/lib.py \
- --cd-datadir \
- $args \
- "$@"
title OK
echo "All tests pass"
diff --git a/ick2/__init__.py b/ick2/__init__.py
index f5770f6..8673046 100644
--- a/ick2/__init__.py
+++ b/ick2/__init__.py
@@ -51,3 +51,29 @@ from .controllerapi import (
)
from .blobapi import BlobAPI
from .blob_store import BlobStore
+
+from .client import (
+ HttpAPI,
+ HttpError,
+ ControllerClient,
+ BlobClient,
+ Reporter,
+)
+from .actionenvs import (
+ Runner,
+ ActionEnvironment,
+ HostEnvironment,
+ ChrootEnvironment,
+ ContainerEnvironment,
+)
+from .workspace import WorkspaceArea, Workspace
+from .actions import (
+ ActionFactory,
+ UnknownStepError,
+ ShellAction,
+ PythonAction,
+ DebootstrapAction,
+ CreateWorkspaceAction,
+ ArchiveWorkspaceAction,
+ PopulateSystreeAction,
+)
diff --git a/ick2/actionenvs.py b/ick2/actionenvs.py
new file mode 100644
index 0000000..7a2b57a
--- /dev/null
+++ b/ick2/actionenvs.py
@@ -0,0 +1,167 @@
+# Copyright (C) 2018 Lars Wirzenius
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+
+import logging
+import os
+import subprocess
+
+
+import cliapp
+
+
+class Runner:
+
+ def __init__(self, reporter, maxbuf=128*1024):
+ self._reporter = reporter
+ self._buffers = {
+ 'stdout': b'',
+ 'stderr': b'',
+ }
+ self._maxbuf = maxbuf
+ self._timeout = 1.0
+
+ def runcmd(self, *argvs, **kwargs):
+ for argv in argvs:
+ logging.debug('Runner.runcmd: argv: %r', argv)
+ for key in kwargs:
+ logging.debug('Runner.runcmd: kwargs: %r=%r', key, kwargs[key])
+ assert all(argv is not None for argv in argvs)
+ exit_code, _, _ = cliapp.runcmd_unchecked(
+ *argvs,
+ stdout_callback=self.capture_stdout,
+ stderr=subprocess.STDOUT,
+ output_timeout=self._timeout,
+ timeout_callback=self.flush,
+ **kwargs
+ )
+ self.flush()
+ logging.debug('Runner.runcmd: finished, exit code: %d', exit_code)
+ return exit_code
+
+ def capture_stdout(self, data):
+ return self.capture('stdout', data)
+
+ def capture(self, stream_name, data):
+ self._buffers[stream_name] += data
+ if len(self._buffers[stream_name]) >= self._maxbuf:
+ self.flush()
+
+ return b''
+
+ def flush(self):
+ stdout = self._buffers['stdout']
+ stderr = self._buffers['stderr']
+ self._reporter.report(None, stdout, stderr)
+ self._buffers['stdout'] = b''
+ self._buffers['stderr'] = b''
+
+
+class Mounter: # pragma: no cover
+
+ def __init__(self, mounts, runner):
+ self._mounts = mounts
+ self._runner = runner
+
+ def __enter__(self):
+ self.mount()
+ return self
+
+ def __exit__(self, *args):
+ self.unmount()
+
+ def mount(self):
+ for dirname, mp in self._mounts:
+ if not os.path.exists(mp):
+ os.mkdir(mp)
+ self._runner.runcmd(['sudo', 'mount', '--bind', dirname, mp])
+
+ def unmount(self):
+ for dirname, mp in reversed(self._mounts):
+ try:
+ self._runner.runcmd(['sudo', 'umount', mp])
+ except BaseException as e:
+ logging.error(
+ 'Ignoring error while unmounting %s: %s', mp, str(e))
+
+
+class ActionEnvironment: # pragma: no cover
+
+ def __init__(self, systree, workspace, reporter):
+ super().__init__()
+ self._systree = systree
+ self._workspace = workspace
+ self._reporter = reporter
+
+ def get_systree_directory(self):
+ return self._systree
+
+ def get_workspace_directory(self):
+ return self._workspace
+
+ def get_mounts(self):
+ return []
+
+ def report(self, exit_code, msg):
+ self._reporter.report(exit_code, msg, None)
+
+ def runcmd(self, argv):
+ raise NotImplementedError()
+
+ def host_runcmd(self, *argvs, cwd=None):
+ env = self.get_env_vars()
+ runner = Runner(self._reporter)
+ mounts = self.get_mounts()
+ with Mounter(mounts, runner):
+ return runner.runcmd(*argvs, cwd=cwd, env=env)
+
+ def get_env_vars(self):
+ env = dict(os.environ)
+ env.update({
+ 'LC_ALL': 'C',
+ 'DEBIAN_FRONTEND': 'noninteractive',
+ })
+ return env
+
+
+class HostEnvironment(ActionEnvironment):
+
+ def runcmd(self, argv):
+ return self.host_runcmd(argv, cwd=self._workspace)
+
+
+class ChrootEnvironment(ActionEnvironment):
+
+ def get_mounts(self): # pragma: no cover
+ return [
+ ('/proc', os.path.join(self._workspace, 'proc')),
+ ('/sys', os.path.join(self._workspace, 'sys')),
+ ]
+
+ def runcmd(self, argv):
+ prefix = ['sudo', 'chroot', self._workspace]
+ return self.host_runcmd(prefix + argv)
+
+
+class ContainerEnvironment(ActionEnvironment):
+
+ def runcmd(self, argv):
+ bind = '{}:/workspace'.format(self._workspace)
+ prefix = [
+ 'sudo', 'systemd-nspawn',
+ '-D', self._systree,
+ '--bind', bind,
+ '--chdir', '/workspace',
+ ]
+ return self.host_runcmd(prefix + argv)
diff --git a/ick2/actionenvs_tests.py b/ick2/actionenvs_tests.py
new file mode 100644
index 0000000..4643527
--- /dev/null
+++ b/ick2/actionenvs_tests.py
@@ -0,0 +1,138 @@
+# Copyright (C) 2018 Lars Wirzenius
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+
+import unittest
+
+
+import ick2
+
+
+class FakeReporter:
+
+ def __init__(self):
+ self.stdout = b''
+ self.stderr = b''
+
+ def report(self, exit_code, stdout, stderr):
+ self.stdout += stdout
+ self.stderr += stderr
+
+
+class RunnerTests(unittest.TestCase):
+
+ def test_runs_echo(self):
+ reporter = FakeReporter()
+ r = ick2.Runner(reporter)
+ exit_code = r.runcmd(['echo', 'hello', 'world'])
+ self.assertEqual(exit_code, 0)
+ self.assertEqual(reporter.stdout, b'hello world\n')
+
+ def test_captures_lots_of_output(self):
+ reporter = FakeReporter()
+ r = ick2.Runner(reporter, maxbuf=1)
+ exit_code = r.runcmd(['echo', 'hello', 'world'])
+ self.assertEqual(exit_code, 0)
+ self.assertEqual(reporter.stdout, b'hello world\n')
+
+ def test_captures_stderr(self):
+ reporter = FakeReporter()
+ r = ick2.Runner(reporter)
+ exit_code = r.runcmd(['sh', '-c', 'echo eek 1>&2 ; exit 2'])
+ self.assertEqual(exit_code, 2)
+ self.assertEqual(reporter.stdout, b'eek\n')
+
+
+class EnvironmentTestsBase(unittest.TestCase):
+
+ def setUp(self):
+ self.workspace = '/workspace'
+ self.fake = FakeRuncmd()
+ self.env = self.create_env()
+ self.env.host_runcmd = self.fake
+
+ def create_env(self):
+ return DummyEnvironment(None, self.workspace, None)
+
+ def expected_argv(self, argv):
+ return argv
+
+ def test_runs_argv(self):
+ argv = ['echo', 'hello', 'world']
+ self.fake.exit_code = 42
+ exit_code = self.env.runcmd(argv)
+ self.assertEqual(self.fake.argv, self.expected_argv(argv))
+ self.assertEqual(self.fake.exit_code, exit_code)
+
+
+class DummyEnvironment(ick2.ActionEnvironment):
+
+ def runcmd(self, argv):
+ return self.host_runcmd(argv)
+
+
+class HostEnvironmentTests(EnvironmentTestsBase):
+
+ def create_env(self):
+ return ick2.HostEnvironment(None, self.workspace, None)
+
+ def expected_argv(self, argv):
+ return argv
+
+ def test_pipeline(self):
+ reporter = FakeReporter()
+ env = ick2.HostEnvironment(None, None, reporter)
+
+ echo = ['echo', 'hello', 'world']
+ cat = ['cat']
+ exit_code = env.host_runcmd(echo, cat)
+ self.assertEqual(exit_code, 0)
+ self.assertEqual(reporter.stdout, b'hello world\n')
+ self.assertEqual(reporter.stderr, b'')
+
+
+class ChrootEnvironmentTests(EnvironmentTestsBase):
+
+ def create_env(self):
+ return ick2.ChrootEnvironment(None, self.workspace, None)
+
+ def expected_argv(self, argv):
+ return ['sudo', 'chroot', self.workspace] + argv
+
+
+class ContainerEnvironmentTests(EnvironmentTestsBase):
+
+ def create_env(self):
+ self.systree = 'systree'
+ self.workspace = 'workspace'
+ return ick2.ContainerEnvironment(self.systree, self.workspace, None)
+
+ def expected_argv(self, argv):
+ prefix = [
+ 'sudo', 'systemd-nspawn', '-D', self.systree,
+ '--bind', '{}:/workspace'.format(self.workspace),
+ '--chdir', '/workspace',
+ ]
+ return prefix + argv
+
+
+class FakeRuncmd:
+
+ def __init__(self):
+ self.argv = None
+ self.exit_code = None
+
+ def __call__(self, argv, cwd=None):
+ self.argv = argv
+ return self.exit_code
diff --git a/ick2/actions.py b/ick2/actions.py
new file mode 100644
index 0000000..fef6889
--- /dev/null
+++ b/ick2/actions.py
@@ -0,0 +1,293 @@
+# Copyright (C) 2018 Lars Wirzenius
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+
+import base64
+import json
+import logging
+import os
+import tempfile
+
+
+import cliapp
+
+
+import ick2
+
+
+class UnknownStepError(Exception):
+
+ pass
+
+
+class ActionFactory:
+
+ _classes = {
+ 'host': ick2.HostEnvironment,
+ 'chroot': ick2.ChrootEnvironment,
+ 'container': ick2.ContainerEnvironment,
+ }
+
+ def __init__(self, systree, workspace_area, reporter):
+ self._systree = systree
+ self._workspace_area = workspace_area
+ self._reporter = reporter
+ self._token = None
+ self._blob_url = None
+
+ def set_token(self, token):
+ self._token = token
+
+ def get_token(self):
+ return self._token
+
+ def set_blob_url_func(self, func):
+ self._blob_url = func
+
+ def get_blob_url_func(self):
+ return self._blob_url
+
+ def get_allowed_environments(self):
+ return list(self._classes.keys())
+
+ def create_environment(self, spec, project_name):
+ env = spec.get('where', 'host')
+ assert env in self.get_allowed_environments()
+ env_class = self._classes[env]
+ area = ick2.WorkspaceArea()
+ area.set_root(self._workspace_area)
+ ws = area.create_workspace(project_name)
+ return env_class(self._systree, ws.get_directory(), self._reporter)
+
+ def create_action(self, spec, project_name):
+ env = self.create_environment(spec, project_name)
+ action = self._create_action_object(env, spec)
+ action.set_token(self.get_token())
+ action.set_blob_url_func(self.get_blob_url_func())
+ return action
+
+ def _create_action_object(self, env, spec):
+ if 'shell' in spec:
+ return ShellAction(env)
+ if 'python' in spec:
+ return PythonAction(env)
+ if 'debootstrap' in spec:
+ return DebootstrapAction(env)
+ if 'archive' in spec:
+ return ArchiveWorkspaceAction(env)
+ if spec.get('action') == 'populate_systree':
+ return PopulateSystreeAction(env)
+ if spec.get('action') == 'create_workspace':
+ return CreateWorkspaceAction(env)
+ raise UnknownStepError('Unknown action %r' % spec)
+
+
+class Action: # pragma: no cover
+
+ def __init__(self, env):
+ self._env = env
+ self._token = None
+ self._blob_url = None
+
+ def set_token(self, token):
+ self._token = token
+
+ def get_token(self):
+ return self._token
+
+ def set_blob_url_func(self, func):
+ self._blob_url = func
+
+ def get_blob_upload_url(self, blob_name):
+ return self._blob_url(blob_name)
+
+ def get_env(self):
+ return self._env
+
+ def get_workspace_area(self):
+ env = self.get_env()
+ area = ick2.WorkspaceArea()
+ area.set_root(env.get_workspace_directory())
+ return area
+
+ def encode_parameters(self, parsms):
+ raise NotImplementedError()
+
+ def encode64(self, params):
+ assert isinstance(params, dict)
+ as_text = json.dumps(params)
+ as_bytes = as_text.encode('UTF-8')
+ as_base64 = base64.b64encode(as_bytes)
+ return as_base64.decode('UTF-8')
+
+ def decode64(self, encoded):
+ as_base64 = encoded.encode('UTF-8')
+ as_bytes = base64.b64decode(as_base64)
+ as_text = as_bytes.decode('UTF-8')
+ return json.loads(as_text)
+
+ def get_authz_headers(self):
+ token = self.get_token()
+ return {
+ 'Authorization': 'Bearer {}'.format(token),
+ }
+
+ def execute(self, params, work):
+ raise NotImplementedError()
+
+
+class ShellAction(Action):
+
+ def encode_parameters(self, params): # pragma: no cover
+ encoded = self.encode64(params)
+ return 'params() { echo -n "%s" | base64 -d; }\n' % encoded
+
+ def execute(self, params, step):
+ prefix = self.encode_parameters(params)
+ snippet = step['shell']
+ argv = ['bash', '-exuc', prefix + snippet]
+ exit_code = self._env.runcmd(argv)
+ self._env.report(exit_code, 'action finished\n')
+ return exit_code
+
+
+class PythonAction(Action):
+
+ def encode_parameters(self, params): # pragma: no cover
+ encoded = self.encode64(params)
+ prefix = (
+ 'import base64, json\n'
+ 'params = json.loads(base64.b64decode(\n'
+ ' "{}").decode("utf8"))\n'
+ ).format(encoded)
+ return prefix
+
+ def execute(self, params, step):
+ prefix = self.encode_parameters(params)
+ snippet = step['python']
+ argv = ['python3', '-c', prefix + '\n' + snippet]
+ exit_code = self._env.runcmd(argv)
+ self._env.report(exit_code, 'action finished\n')
+ return exit_code
+
+
+class DebootstrapAction(Action):
+
+ default_mirror = 'http://deb.debian.org/debian'
+
+ def encode_parameters(self, params): # pragma: no cover
+ pass
+
+ def execute(self, params, step):
+ suite = step.get('debootstrap')
+ if suite is None or suite == 'auto':
+ suite = params['debian_codename']
+ mirror = step.get('mirror', self.default_mirror)
+
+ env = self.get_env()
+ workspace = env.get_workspace_directory()
+ argv = ['sudo', 'debootstrap', suite, '.', mirror]
+ exit_code = self._env.host_runcmd(argv, cwd=workspace)
+ self._env.report(exit_code, 'action finished\n')
+ return exit_code
+
+
+class CreateWorkspaceAction(Action):
+
+ def encode_parameters(self, params): # pragma: no cover
+ pass
+
+ def execute(self, params, step):
+ logging.debug('CreateWorkspaceAction: params=%r', params)
+ logging.debug('CreateWorkspaceAction: step=%r', step)
+
+ env = self.get_env()
+ workspace = env.get_workspace_directory()
+ self._env.report(0, 'Created workspace %s\n' % workspace)
+ return 0
+
+
+class ArchiveWorkspaceAction(Action): # pragma: no cover
+
+ def encode_parameters(self, params):
+ pass
+
+ def execute(self, params, step):
+ blob_name = params.get('systree_name', 'noname')
+
+ env = self.get_env()
+ dirname = env.get_workspace_directory()
+
+ url = self.get_blob_upload_url(blob_name)
+ headers = self.get_authz_headers()
+
+ logging.debug('ArchiveWorkspaceAction: url=%r', url)
+ logging.debug('ArchiveWorkspaceAction: headers=%r', headers)
+
+ assert url is not None
+ assert headers is not None
+
+ fd, tarball = tempfile.mkstemp()
+ os.close(fd)
+ tar = ['sudo', 'tar', '-zcf', tarball, '-C', dirname, '.']
+ exit_code = self._env.host_runcmd(tar)
+ if exit_code != 0:
+ self._env.report(exit_code, 'tarball generation finished\n')
+ os.remove(tarball)
+ return exit_code
+ self._env.report(None, 'tarball generation finished\n')
+
+ curl = ['curl', '-sk', '-T', tarball] + [
+ '-H{}:{}'.format(name, value)
+ for name, value in headers.items()
+ ] + [url]
+ exit_code = self._env.host_runcmd(curl)
+ self._env.report(
+ exit_code, 'curl upload finished (exit code %s)\n' % exit_code)
+ os.remove(tarball)
+ return exit_code
+
+
+class PopulateSystreeAction(Action): # pragma: no cover
+
+ def encode_parameters(self, params):
+ pass
+
+ def execute(self, params, step):
+ systree_name = step.get('systree_name')
+ if not systree_name:
+ return 1
+
+ env = self.get_env()
+ systree_dir = env.get_systree_directory()
+ self.make_directory_empty(systree_dir)
+ return self.download_and_unpack_systree(systree_name, systree_dir)
+
+ def make_directory_empty(self, dirname):
+ return cliapp.runcmd(
+ ['sudo', 'find', dirname, '-mindepth', '1', '-delete'])
+
+ def download_and_unpack_systree(self, systree_name, dirname):
+ url = self.get_blob_upload_url(systree_name)
+ headers = self.get_authz_headers()
+ curl = ['curl', '-sk'] + [
+ '-H{}:{}'.format(name, value)
+ for name, value in headers.items()
+ ] + [url]
+
+ untar = ['sudo', 'tar', '-zxf', '-', '-C', dirname]
+
+ exit_code = self._env.host_runcmd(curl, untar)
+ self._env.report(exit_code, 'action finished\n')
+ return exit_code
diff --git a/ick2/actions_tests.py b/ick2/actions_tests.py
new file mode 100644
index 0000000..8220fe2
--- /dev/null
+++ b/ick2/actions_tests.py
@@ -0,0 +1,230 @@
+# Copyright (C) 2018 Lars Wirzenius
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+
+import os
+import shutil
+import tempfile
+import unittest
+
+
+import ick2
+
+
+class ActionFactoryTests(unittest.TestCase):
+
+ def setUp(self):
+ self.tempdir = tempfile.mkdtemp()
+ self.workspace_area = self.tempdir
+ self.project = 'magic'
+ self.af = ick2.ActionFactory('systree', self.workspace_area, None)
+
+ def tearDown(self):
+ shutil.rmtree(self.tempdir)
+
+ def test_sets_token(self):
+ token = 'this is a token'
+ self.af.set_token(token)
+ self.assertEqual(self.af.get_token(), token)
+
+ def test_lists_allows_environments(self):
+ allowed = self.af.get_allowed_environments()
+ self.assertTrue(isinstance(allowed, list))
+ self.assertNotEqual(allowed, [])
+
+ def test_creates_host_environment_by_default(self):
+ spec = {
+ 'shell': 'echo hello world',
+ }
+ env = self.af.create_environment(spec, self.project)
+ self.assertTrue(isinstance(env, ick2.HostEnvironment))
+
+ self.assertEqual(
+ env.get_workspace_directory(),
+ os.path.join(self.workspace_area, self.project))
+
+ def test_creates_host_environment_when_asked_explicitly(self):
+ spec = {
+ 'where': 'host',
+ }
+ env = self.af.create_environment(spec, self.project)
+ self.assertTrue(isinstance(env, ick2.HostEnvironment))
+
+ def test_creates_chroot_environment_by_default(self):
+ spec = {
+ 'where': 'chroot',
+ }
+ env = self.af.create_environment(spec, self.project)
+ self.assertTrue(isinstance(env, ick2.ChrootEnvironment))
+
+ def test_creates_container_environment_by_default(self):
+ spec = {
+ 'where': 'container',
+ }
+ env = self.af.create_environment(spec, self.project)
+ self.assertTrue(isinstance(env, ick2.ContainerEnvironment))
+
+ def test_creates_shell_action_on_host(self):
+ def url(blob_name):
+ return blob_name
+
+ spec = {
+ 'shell': 'echo hello, world',
+ }
+ token = 'secret token'
+ self.af.set_token(token)
+ self.af.set_blob_url_func(url)
+ action = self.af.create_action(spec, self.project)
+ self.assertTrue(isinstance(action, ick2.ShellAction))
+ self.assertTrue(isinstance(action.get_env(), ick2.HostEnvironment))
+ self.assertEqual(action.get_token(), token)
+ self.assertEqual(action.get_blob_upload_url('foo'), 'foo')
+
+ def test_creates_python_action_on_host(self):
+ spec = {
+ 'python': 'pass',
+ }
+ action = self.af.create_action(spec, self.project)
+ self.assertTrue(isinstance(action, ick2.PythonAction))
+ self.assertTrue(isinstance(action.get_env(), ick2.HostEnvironment))
+
+ def test_creates_debootstrap_action_on_host(self):
+ spec = {
+ 'debootstrap': 'auto',
+ }
+ action = self.af.create_action(spec, self.project)
+ self.assertTrue(isinstance(action, ick2.DebootstrapAction))
+ self.assertTrue(isinstance(action.get_env(), ick2.HostEnvironment))
+
+ def test_creates_create_workspace_action_on_host(self):
+ spec = {
+ 'action': 'create_workspace',
+ }
+ action = self.af.create_action(spec, self.project)
+ self.assertTrue(isinstance(action, ick2.CreateWorkspaceAction))
+ self.assertTrue(isinstance(action.get_env(), ick2.HostEnvironment))
+
+ def test_creates_archive_workspace_action_on_host(self):
+ spec = {
+ 'archive': 'workspace',
+ }
+ action = self.af.create_action(spec, self.project)
+ self.assertTrue(isinstance(action, ick2.ArchiveWorkspaceAction))
+ self.assertTrue(isinstance(action.get_env(), ick2.HostEnvironment))
+
+ def test_creates_populate_sysgtre_action_on_host(self):
+ spec = {
+ 'action': 'populate_systree',
+ }
+ action = self.af.create_action(spec, self.project)
+ self.assertTrue(isinstance(action, ick2.PopulateSystreeAction))
+ self.assertTrue(isinstance(action.get_env(), ick2.HostEnvironment))
+
+ def test_raises_exception_for_unknown_step(self):
+ with self.assertRaises(ick2.UnknownStepError):
+ self.af.create_action({}, self.project)
+
+
+class ShellActionTests(unittest.TestCase):
+
+ def test_encodes_parameters(self):
+ params = {
+ 'foo': 'bar',
+ }
+ action = ick2.ShellAction(None)
+ encoded = action.encode64(params)
+ decoded = action.decode64(encoded)
+ self.assertEqual(params, decoded)
+
+ def test_runs_argv(self):
+ params = {}
+ step = {
+ 'shell': 'echo hello, world',
+ }
+ env = DummyEnvironment()
+ env.exit_code = 42
+ action = ick2.ShellAction(env)
+ exit_code = action.execute(params, step)
+ self.assertEqual(exit_code, env.exit_code)
+
+
+class PythonActionTests(unittest.TestCase):
+
+ def test_runs_argv(self):
+ params = {}
+ step = {
+ 'python': 'pass',
+ }
+ env = DummyEnvironment()
+ env.exit_code = 42
+ action = ick2.PythonAction(env)
+ exit_code = action.execute(params, step)
+ self.assertEqual(exit_code, env.exit_code)
+
+
+class DebootstrapActionTests(unittest.TestCase):
+
+ def test_runs_argv(self):
+ params = {
+ 'debian_codename': 'stretch',
+ }
+ step = {
+ 'debootstrap': 'auto',
+ 'mirror': 'http://deb.debian.org/debian',
+ }
+ env = DummyEnvironment()
+ env.exit_code = 42
+ action = ick2.DebootstrapAction(env)
+ exit_code = action.execute(params, step)
+ self.assertEqual(exit_code, env.exit_code)
+
+
+class CreateWorkspaceActionTests(unittest.TestCase):
+
+ def setUp(self):
+ self.tempdir = tempfile.mkdtemp()
+
+ def tearDown(self):
+ shutil.rmtree(self.tempdir)
+
+ def test_runs_argv(self):
+ params = {}
+ step = {
+ 'action': 'create_workspace',
+ }
+ env = DummyEnvironment(tempdir=self.tempdir)
+ action = ick2.CreateWorkspaceAction(env)
+ exit_code = action.execute(params, step)
+ self.assertEqual(exit_code, 0)
+
+
+class DummyEnvironment:
+
+ def __init__(self, tempdir=None):
+ self.exit_code = None
+ self.argv = None
+ self.tempdir = tempdir or tempfile.mkdtemp()
+
+ def report(self, exit_code, msg):
+ pass
+
+ def host_runcmd(self, argv, **kwargs):
+ self.argv = argv
+ return self.exit_code
+
+ def runcmd(self, argv, **kwargs):
+ return self.host_runcmd(argv)
+
+ def get_workspace_directory(self):
+ return self.tempdir
diff --git a/ick2/client.py b/ick2/client.py
new file mode 100644
index 0000000..99b8487
--- /dev/null
+++ b/ick2/client.py
@@ -0,0 +1,238 @@
+# Copyright (C) 2018 Lars Wirzenius
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+
+import json
+import logging
+import time
+
+
+import requests
+
+
+class HttpError(Exception):
+
+ pass
+
+
+class HttpAPI:
+
+ # Make requests to an HTTP API.
+
+ json_type = 'application/json'
+
+ def __init__(self):
+ self._session = requests.Session()
+ self._token = None
+
+ def set_session(self, session):
+ self._session = session
+
+ def set_token(self, token):
+ self._token = token
+
+ def get_dict(self, url, headers=None):
+ r = self._request(self._session.get, url, headers=headers)
+ ct = r.headers.get('Content-Type')
+ if ct != self.json_type:
+ raise HttpError('Not JSON response')
+ try:
+ return r.json()
+ except json.decoder.JSONDecodeError:
+ raise HttpError('JSON parsing error')
+
+ def get_blob(self, url, headers=None):
+ r = self._request(self._session.get, url, headers=headers)
+ return r.content
+
+ def post(self, url, headers=None, body=None):
+ self._send_request(self._session.post, url, headers=headers, body=body)
+ return None
+
+ def put(self, url, headers=None, body=None):
+ self._send_request(self._session.put, url, headers=headers, body=body)
+ return None
+
+ def _send_request(self, func, url, headers=None, body=None):
+ if headers is None:
+ headers = {}
+ headers = dict(headers)
+ h, body = self._get_content_type_header(body)
+ headers.update(h)
+ self._request(func, url, headers=headers, data=body)
+ return None
+
+ def _get_content_type_header(self, body):
+ if isinstance(body, dict):
+ header = {
+ 'Content-Type': 'application/json',
+ }
+ body = json.dumps(body)
+ return header, body
+ return {}, body
+
+ def _get_authorization_headers(self):
+ return {
+ 'Authorization': 'Bearer {}'.format(self._token),
+ }
+
+ def _request(self, func, url, headers=None, **kwargs):
+ if headers is None:
+ headers = {}
+ headers.update(self._get_authorization_headers())
+
+ logging.debug('request: func=%r', func)
+ logging.debug('request: url=%r', url)
+ for h in headers:
+ logging.debug('request: %s: %s', h, headers[h])
+ logging.debug('request: kwargs=%r', kwargs)
+
+ r = func(url, headers=headers, verify=False, **kwargs)
+ logging.debug('response: status_code=%r', r.status_code)
+ logging.debug('response: content=%r', r.content)
+
+ if not r.ok:
+ raise HttpError(r.status_code)
+ return r
+
+
+class ControllerClient:
+
+ def __init__(self):
+ self._name = None
+ self._api = HttpAPI()
+ self._url = None
+
+ def set_client_name(self, name):
+ self._name = name
+
+ def set_http_api(self, api):
+ self._api = api
+
+ def set_controller_url(self, url):
+ self._url = url
+
+ def set_token(self, token):
+ self._api.set_token(token)
+
+ def url(self, path):
+ return '{}{}'.format(self._url, path)
+
+ def get_artifact_store_url(self):
+ url = self.url('/version')
+ version = self._api.get_dict(url)
+ url = version.get('artifact_store')
+ logging.info('Artifact store URL: %r', url)
+ return url
+
+ def get_blob_client(self):
+ url = self.get_artifact_store_url()
+ blobs = BlobClient()
+ blobs.set_url(url)
+ blobs.set_http_api(self._api)
+ return blobs
+
+ def register(self):
+ assert self._url is not None
+ url = self.url('/workers')
+ logging.info('Registering worker %s to %s', self._name, url)
+ body = {
+ 'worker': self._name,
+ }
+ try:
+ self._api.post(url, body=body)
+ except HttpError:
+ pass
+
+ def get_work(self):
+ url = self.url('/work/{}'.format(self._name))
+ work = self._api.get_dict(url)
+ logging.info('Requested work, got: %r', work)
+ return work
+
+ def report_work(self, work):
+ logging.info('Reporting work: %r', work)
+ url = self.url('/work')
+ headers = {
+ 'Content-Type': self._api.json_type,
+ }
+ body = json.dumps(work)
+ self._api.post(url, headers=headers, body=body)
+
+
+class Reporter: # pragma: no cover
+
+ def __init__(self, api, work):
+ self._api = api
+ self._work = dict(work)
+
+ def report(self, exit_code, stdout, stderr):
+ result = dict(self._work)
+ result['exit_code'] = exit_code
+ result['stdout'] = self.make_string(stdout)
+ result['stderr'] = self.make_string(stderr)
+ for key in result:
+ logging.debug('Reporter: result[%r]=%r', key, result[key])
+ self._api.report_work(result)
+
+ def make_string(self, thing):
+ if isinstance(thing, bytes):
+ return thing.decode('UTF-8')
+ if isinstance(thing, str):
+ return thing
+ if thing is None:
+ return thing
+ return repr(thing)
+
+ def get_work_result(self, work):
+ return {
+ 'build_id': work['build_id'],
+ 'worker': work['worker'],
+ 'project': work['project'],
+ 'pipeline': work['pipeline'],
+ 'stdout': '',
+ 'stderr': '',
+ 'exit_code': None,
+ 'timestamp': self.now(),
+ }
+
+ def now(self):
+ return time.strftime('%Y-%m-%dT%H:%M:%S')
+
+
+class BlobClient:
+
+ def __init__(self):
+ self._url = None
+ self._api = None
+
+ def set_url(self, url):
+ self._url = url
+
+ def set_http_api(self, api):
+ self._api = api
+
+ def url(self, blob_name):
+ assert self._url is not None
+ return '{}/blobs/{}'.format(self._url, blob_name)
+
+ def download(self, blob_name):
+ logging.info('Download blob %s', blob_name)
+ url = self.url(blob_name)
+ return self._api.get_blob(url)
+
+ def upload(self, blob_name, blob):
+ logging.info('Upload blob %s', blob_name)
+ url = self.url(blob_name)
+ return self._api.put(url, body=blob)
diff --git a/ick2/client_tests.py b/ick2/client_tests.py
new file mode 100644
index 0000000..92da36a
--- /dev/null
+++ b/ick2/client_tests.py
@@ -0,0 +1,274 @@
+# Copyright (C) 2018 Lars Wirzenius
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+
+import json
+import unittest
+
+
+import ick2
+
+
+json_type = 'application/json'
+
+
+class HttpAPITests(unittest.TestCase):
+
+ def setUp(self):
+ self.session = FakeHttpSession()
+ self.session.token = 'SECRET-TOKEN'
+ self.client = ick2.HttpAPI()
+ self.client.set_session(self.session)
+ self.client.set_token(self.session.token)
+
+ def test_get_dict_raises_exception_on_error(self):
+ self.session.response = FakeResponse(400)
+ with self.assertRaises(ick2.HttpError):
+ self.client.get_dict('http://controller/version')
+
+ def test_get_dict_raises_exception_on_not_json(self):
+ self.session.response = FakeResponse(200, body=json.dumps('{}'))
+ with self.assertRaises(ick2.HttpError):
+ self.client.get_dict('http://controller/version')
+
+ def test_get_dict_raises_exception_on_malformed_json(self):
+ self.session.response = FakeResponse(
+ 200, body='this is not really JSON', content_type=json_type)
+ with self.assertRaises(ick2.HttpError):
+ self.client.get_dict('http://controller/version')
+
+ def test_get_dict_returns_response(self):
+ version = {
+ 'version': '1.0',
+ }
+
+ self.session.response = FakeResponse(
+ 200, body=json.dumps(version), content_type=json_type)
+ obj = self.client.get_dict('http://controller/version')
+ self.assertEqual(obj, version)
+
+ def test_get_blob_raises_exception_on_error(self):
+ self.session.response = FakeResponse(404)
+ with self.assertRaises(ick2.HttpError):
+ self.client.get_blob('http://blobs/foo')
+
+ def test_get_blob_returns_response(self):
+ blob = b'hello, world\n'
+ self.session.response = FakeResponse(200, body=blob)
+ obj = self.client.get_blob('http://blobs/foo')
+ self.assertEqual(obj, blob)
+
+ def test_post_raises_exception_on_error(self):
+ self.session.response = FakeResponse(400)
+ with self.assertRaises(ick2.HttpError):
+ self.client.post('http://controller/work', body='')
+
+ def test_post_succeeds(self):
+ work = {
+ 'project': 'foo',
+ 'stdout': 'hello, world\n',
+ }
+
+ self.session.response = FakeResponse(
+ 201, body=json.dumps(work), content_type=json_type)
+ obj = self.client.post('http://controller/work', body=work)
+ self.assertEqual(obj, None)
+
+ def test_put_raises_exception_on_error(self):
+ blob = b'fooblob'
+ self.session.response = FakeResponse(400)
+ with self.assertRaises(ick2.HttpError):
+ self.client.put('http://blobs/foo', body=blob)
+
+ def test_put_succeeds(self):
+ blob = b'fooblob'
+ self.session.response = FakeResponse(201, body=None)
+ obj = self.client.put('http://controller/work', body=blob)
+ self.assertEqual(obj, None)
+
+
+class ControllerClientTests(unittest.TestCase):
+
+ def setUp(self):
+ self.session = FakeHttpSession()
+ self.session.token = 'SECRET-TOKEN'
+
+ self.client = ick2.HttpAPI()
+ self.client.set_session(self.session)
+ self.client.set_token(self.session.token)
+
+ self.controller = ick2.ControllerClient()
+ self.controller.set_http_api(self.client)
+ self.controller.set_controller_url('https://controller')
+ self.controller.set_token('SECRET-TOKEN')
+ self.controller.set_client_name('asterix')
+
+ def test_register_succeeds_on_error(self):
+ self.session.response = FakeResponse(400)
+ self.assertEqual(self.controller.register(), None)
+
+ def test_register_succeeds(self):
+ self.session.response = FakeResponse(200)
+ self.assertEqual(self.controller.register(), None)
+
+ def test_get_work_raises_exception_on_error(self):
+ self.session.response = FakeResponse(400)
+ with self.assertRaises(ick2.HttpError):
+ self.controller.get_work()
+
+ def test_get_work_succeeds(self):
+ work = {
+ 'build_id': 'rome/1'
+ }
+ self.session.response = FakeResponse(
+ 200, body=json.dumps(work), content_type=json_type)
+ self.assertEqual(self.controller.get_work(), work)
+
+ def test_report_work_raises_exception_on_error(self):
+ work = {
+ 'stdout': 'hello, world',
+ }
+ self.session.response = FakeResponse(400)
+ with self.assertRaises(ick2.HttpError):
+ self.controller.report_work(work)
+
+ def test_report_work_succeeds(self):
+ work = {
+ 'stdout': 'hello, world',
+ }
+ self.session.response = FakeResponse(200)
+ self.assertEqual(self.controller.report_work(work), None)
+
+ def test_get_artifact_store_url_raises_exception_on_error(self):
+ self.session.response = FakeResponse(400)
+ with self.assertRaises(ick2.HttpError):
+ self.controller.get_artifact_store_url()
+
+ def test_get_artifact_store_url_succeeds(self):
+ url = 'https://blobs'
+ version = {
+ 'artifact_store': url,
+ }
+ self.session.response = FakeResponse(
+ 200, body=json.dumps(version), content_type=json_type)
+ self.assertEqual(self.controller.get_artifact_store_url(), url)
+
+
+class BlobServiceClientTests(unittest.TestCase):
+
+ def setUp(self):
+ self.session = FakeHttpSession()
+ self.session.token = 'SECRET-TOKEN'
+
+ self.client = ick2.HttpAPI()
+ self.client.set_session(self.session)
+ self.client.set_token(self.session.token)
+
+ self.controller = ick2.ControllerClient()
+ self.controller.set_http_api(self.client)
+ self.controller.set_controller_url('https://controller')
+ self.controller.set_token('SECRET-TOKEN')
+ self.controller.set_client_name('asterix')
+
+ def get_blob_client(self):
+ url = 'https://blobs'
+ version = {
+ 'artifact_store': url,
+ }
+ self.session.response = FakeResponse(
+ 200, body=json.dumps(version), content_type=json_type)
+
+ return self.controller.get_blob_client()
+
+ def test_get_blob_client_raises_exception_on_error(self):
+ self.session.response = FakeResponse(400)
+ with self.assertRaises(ick2.HttpError):
+ self.controller.get_blob_client()
+
+ def test_get_blob_client_succeeds(self):
+ blobs = self.get_blob_client()
+ self.assertTrue(isinstance(blobs, ick2.BlobClient))
+
+ def test_download_raises_exception_on_error(self):
+ blobs = self.get_blob_client()
+ self.session.response = FakeResponse(400)
+ with self.assertRaises(ick2.HttpError):
+ blobs.download('foo')
+
+ def test_download_succeeds(self):
+ blobs = self.get_blob_client()
+ blob = b'hello, world'
+ self.session.response = FakeResponse(200, body=blob)
+ self.assertEqual(blobs.download('foo'), blob)
+
+ def test_upload_raises_exception_on_error(self):
+ blobs = self.get_blob_client()
+ blob = b'hello, world'
+ self.session.response = FakeResponse(400)
+ with self.assertRaises(ick2.HttpError):
+ blobs.upload('foo', blob)
+
+ def test_upload_succeeds(self):
+ blobs = self.get_blob_client()
+ blob = b'hello, world'
+ self.session.response = FakeResponse(200)
+ self.assertEqual(blobs.upload('foo', blob), None)
+
+
+class FakeHttpSession:
+
+ def __init__(self):
+ self.response = None
+ self.token = None
+
+ def get(self, url, headers=None, verify=None):
+ assert self.response is not None
+ assert self.is_authorized(headers)
+ return self.response
+
+ def post(self, url, headers=None, data=None, verify=None):
+ assert self.response is not None
+ assert self.is_authorized(headers)
+ return self.response
+
+ def put(self, url, headers=None, data=None, verify=None):
+ assert self.response is not None
+ assert self.is_authorized(headers)
+ return self.response
+
+ def is_authorized(self, headers):
+ assert self.token is not None
+ assert 'Authorization' in headers
+ v = headers['Authorization']
+ return v == 'Bearer {}'.format(self.token)
+
+
+class FakeResponse:
+
+ def __init__(self, status_code, body=None, content_type=None):
+ if content_type is None:
+ content_type = 'application/octet-stream'
+
+ self.status_code = status_code
+ self.headers = {
+ 'Content-Type': content_type,
+ }
+ self.content = body
+
+ @property
+ def ok(self):
+ return self.status_code in (200, 201)
+
+ def json(self):
+ return json.loads(self.content)
diff --git a/ick2/workspace.py b/ick2/workspace.py
new file mode 100644
index 0000000..38cf5e4
--- /dev/null
+++ b/ick2/workspace.py
@@ -0,0 +1,64 @@
+# Copyright (C) 2018 Lars Wirzenius
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+
+import os
+import shutil
+
+
+class WorkspaceArea:
+
+ def __init__(self):
+ self._root = None
+
+ def set_root(self, root):
+ self._root = root
+
+ def get_root(self):
+ return self._root
+
+ def clear(self): # pragma: no cover
+ names = os.listdir(self._root)
+ for name in names:
+ pathname = os.path.join(self._root, name)
+ if os.path.isdir(pathname):
+ shutil.rmtree(pathname)
+ else:
+ os.remove(pathname)
+
+ def create_workspace(self, name):
+ dirname = self.get_workspace_directory(name)
+ if not os.path.exists(dirname):
+ os.mkdir(dirname)
+ ws = Workspace()
+ ws.set_directory(dirname)
+ return ws
+
+ def get_workspace_directory(self, name):
+ return os.path.join(self._root, name)
+
+
+class Workspace:
+
+ def __init__(self):
+ self._dirname = None
+
+ def set_directory(self, dirname):
+ self._dirname = dirname
+
+ def get_directory(self):
+ return self._dirname
+
+ def destroy(self):
+ shutil.rmtree(self.get_directory())
diff --git a/ick2/workspace_tests.py b/ick2/workspace_tests.py
new file mode 100644
index 0000000..fa5e788
--- /dev/null
+++ b/ick2/workspace_tests.py
@@ -0,0 +1,68 @@
+# Copyright (C) 2018 Lars Wirzenius
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+
+import os
+import shutil
+import tempfile
+import unittest
+
+
+import ick2
+
+
+class WorkspaceAreaTests(unittest.TestCase):
+
+ def setUp(self):
+ self.tempdir = tempfile.mkdtemp()
+ self.area = ick2.WorkspaceArea()
+ self.area.set_root(self.tempdir)
+
+ def tearDown(self):
+ shutil.rmtree(self.tempdir)
+
+ def test_sets_root(self):
+ self.assertEqual(self.area.get_root(), self.tempdir)
+
+ def test_clears_workspace_area(self):
+ name = 'foo-project'
+ ws = self.area.create_workspace(name)
+ self.area.clear()
+ self.assertEqual(os.listdir(self.tempdir), [])
+
+ def test_creates_workspace(self):
+ name = 'foo-project'
+ ws = self.area.create_workspace(name)
+ self.assertTrue(isinstance(ws, ick2.Workspace))
+ dirname = os.path.join(self.tempdir, name)
+ self.assertEqual(ws.get_directory(), dirname)
+ self.assertTrue(os.path.isdir(dirname))
+
+ def test_creates_workspace_twice(self):
+ name = 'foo-project'
+ ws0 = self.area.create_workspace(name)
+ ws = self.area.create_workspace(name)
+ self.assertTrue(isinstance(ws, ick2.Workspace))
+ dirname = os.path.join(self.tempdir, name)
+ self.assertEqual(ws.get_directory(), dirname)
+ self.assertTrue(os.path.isdir(dirname))
+
+ def test_destroys_nonempty_workspace(self):
+ name = 'foo-project'
+ ws = self.area.create_workspace(name)
+ dirname = ws.get_directory()
+ with open(os.path.join(dirname, 'file.txt'), 'w') as f:
+ f.write('hello, world')
+ ws.destroy()
+ self.assertFalse(os.path.exists(dirname))
diff --git a/worker_manager b/worker_manager
index 7530469..ad01fd7 100755
--- a/worker_manager
+++ b/worker_manager
@@ -15,18 +15,12 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-import base64
-import json
import logging
-import os
-import subprocess
-import tempfile
import time
import apifw
import cliapp
import Crypto.PublicKey.RSA
-import requests
import urllib3
import ick2
@@ -89,16 +83,25 @@ class WorkerManager(cliapp.Application):
)
def process_args(self, args):
+ try:
+ self.main_program(args)
+ except BaseException as e:
+ logging.error(str(e), exc_info=True)
+ raise
+
+ def main_program(self, args):
self.settings.require('name')
self.settings.require('controller')
name = self.settings['name']
url = self.settings['controller']
+ workspace = self.settings['workspace']
+ systree = self.settings['systree']
+
tg = TokenGenerator()
tg.set_key(self.settings['token-key'])
api = ControllerAPI(name, url, tg)
- worker = Worker(
- name, api, self.settings['workspace'], self.settings['systree'])
+ worker = Worker(name, api, workspace, systree)
logging.info('Worker manager %s starts, controller is %s', name, url)
@@ -118,106 +121,43 @@ class WorkerManager(cliapp.Application):
class ControllerAPI:
def __init__(self, name, url, token_generator):
- self._name = name
- self._url = url
- self._blob_url = None
self._token_generator = token_generator
- self._httpapi = HttpApi()
+ self._cc = ick2.ControllerClient()
+ self._cc.set_client_name(name)
+ self._cc.set_controller_url(url)
+ self._blobs = None
+
+ def get_token(self):
+ return self._token_generator.get_token()
def register(self):
- logging.info('Registering worker %s to %s', self._name, self._url)
- url = self.url('/workers')
- headers = self.get_auth_headers()
- body = {
- 'worker': self._name,
- }
- code = self._httpapi.post(url, headers, body)
- if code not in [200, 201, 409]:
- raise cliapp.AppException('Failed to register worker')
+ self._cc.set_token(self.get_token())
+ self._cc.register()
def get_work(self):
- url = self.url('/work/{}'.format(self._name))
- headers = self.get_auth_headers()
- work = self._httpapi.get(url, headers)
- if work:
- logging.info('Response: %r', work)
- return work
+ self._cc.set_token(self.get_token())
+ return self._cc.get_work()
def report_work(self, work):
- logging.info('POST %s', work)
- url = self.url('/work')
- headers = self.get_auth_headers()
- code = self._httpapi.post(url, headers, work)
- if code not in (200, 201):
- raise cliapp.AppException(
- 'Error posting data to controller: {}'.format(code))
-
- def url(self, path):
- return '{}{}'.format(self._url, path)
-
- def get_auth_headers(self):
- token = self._token_generator.get_token()
- return {
- 'Authorization': 'Bearer {}'.format(token),
- }
+ self._cc.set_token(self.get_token())
+ self._cc.report_work(work)
+
+ def get_blob_upload_url(self, name):
+ blobs = self.get_blob_client()
+ return blobs.url(name)
def upload_blob(self, blob_id, blob):
- logging.info('Upload blob %s', blob_id)
- url = self.bloburl(blob_id)
- headers = self.get_auth_headers()
- code = self._httpapi.put(url, headers, blob)
+ blobs = self.get_blob_client()
+ blobs.upload(blob_id, blob)
def download_blob(self, blob_id):
- logging.info('Download blob %s', blob_id)
- url = self.bloburl(blob_id)
- headers = self.get_auth_headers()
- return self._httpapi.get_blob(url, headers)
-
- def bloburl(self, blob_id):
- if self._blob_url is None:
- self._blob_url = self.get_artifact_store_url()
- if self._blob_url is not None:
- return '{}/blobs/{}'.format(self._blob_url, blob_id)
- logging.error('Do not artifact store URL')
- return None
-
- def get_artifact_store_url(self):
- url = self.url('/version')
- headers = self.get_auth_headers()
- version = self._httpapi.get(url, headers)
- logging.info('Version: %r', version)
- if version:
- return version.get('artifact_store')
-
-
-class HttpApi:
-
- def __init__(self):
- self._session = requests.Session()
-
- def post(self, url, headers, body):
- r = self._session.post(url, json=body, headers=headers, verify=False)
- if not r.ok:
- logging.warning('Error: POST %s returned %s', url, r.status_code)
- return r.status_code
+ blobs = self.get_blob_client()
+ return blobs.download(blob_id)
- def put(self, url, headers, body):
- r = self._session.put(url, data=body, headers=headers, verify=False)
- if not r.ok:
- logging.warning('Error: PUT %s returned %s', url, r.status_code)
- return r.status_code
-
- def get(self, url, headers):
- r = self._session.get(url, headers=headers, verify=False)
- if not r.ok or not r.text:
- return None
- return r.json()
-
- def get_blob(self, url, headers):
- r = self._session.get(url, headers=headers, verify=False)
- if not r.ok:
- return None
- return r.content
+ def get_blob_client(self):
+ if self._blobs is None:
+ self._blobs = self._cc.get_blob_client()
+ return self._blobs
class TokenGenerator:
@@ -290,353 +230,23 @@ class Worker:
self._systree = systree
def do_work(self, work):
-
- def post(stream_name, data):
- s = self.get_work_result(work)
- s[stream_name] = data
- self._api.report_work(s)
-
- step = work['step']
- logging.info('Running step: %r', step)
- exit_code = 0
- if exit_code == 0:
- klass = self.worker_factory(step)
- if klass is None:
- exit_code = -1
- else:
- worker = klass(self._api, self._workspace, self._systree, post)
- exit_code = worker.do(work)
- self.finish_work(work, exit_code)
-
- def get_work_result(self, work):
- return {
- 'build_id': work['build_id'],
- 'worker': self._name,
- 'project': work['project'],
- 'pipeline': work['pipeline'],
- 'stdout': '',
- 'stderr': '',
- 'exit_code': None,
- 'timestamp': self.now(),
- }
-
- def now(self):
- return time.strftime('%Y-%m-%dT%H:%M:%S')
-
- def worker_factory(self, step):
- table = [
- ('shell', None, ShellWorker),
- ('python', None, PythonWorker),
- ('debootstrap', None, DebootstrapWorker),
- ('archive', None, WorkspaceArchiver),
- ('action', 'populate_systree', SystreePopulator),
- ('action', 'create_workspace', WorkspaceCleaner),
- ]
-
- for key, value, klass in table:
- if key in step and (value is None or step[key] == value):
- return klass
-
- logging.warning('Cannot find worker for %s', step)
- return None
-
- def finish_work(self, work, exit_code):
- s = self.get_work_result(work)
- s['exit_code'] = exit_code
- self._api.report_work(s)
-
-
-class Runner:
-
- def __init__(self, post_output):
- self._post = post_output
- self._buffers = {'stdout': '', 'stderr': ''}
- self._maxbuf = 1024 * 256
- self._timeout = 1.0
-
- def runcmd(self, argv, **kwargs):
- logging.debug('Runner.runcmd: argv=%r %r', argv, kwargs)
- exit_code, _, _ = cliapp.runcmd_unchecked(
- argv,
- stdout_callback=self.capture_stdout,
- stderr=subprocess.STDOUT,
- output_timeout=self._timeout,
- timeout_callback=self.flush,
- **kwargs
- )
- self.flush()
- logging.debug('Runner.runcmd: finished, exit_code=%r', exit_code)
- return exit_code
-
- def capture_stdout(self, data):
- return self.capture('stdout', data)
-
- def capture_stderr(self, data):
- return self.capture('stderr', data)
-
- def capture(self, stream_name, data):
- logging.debug('CAPTURE %s: %r', stream_name, data)
- self._buffers[stream_name] += data.decode('UTF-8')
- if len(self._buffers[stream_name]) >= self._maxbuf:
- self.flush()
- return b''
-
- def flush(self):
- logging.debug('flushing: self._buffers=%r', self._buffers)
- streams = list(self._buffers.keys())
- logging.debug('streams: %r', streams)
- for stream in streams:
- buf = self._buffers[stream]
- logging.debug('FLUSH: %s: %r', stream, buf)
- self._buffers[stream] = ''
- if buf:
- logging.debug('Posting %d bytes of %s', len(buf), stream)
- self._post(stream, buf)
-
-
-class Mounter:
-
- def __init__(self, mounts, runner):
- self._mounts = mounts
- self._runner = runner
-
- def __enter__(self):
- self.mount()
- return self
-
- def __exit__(self, *args):
- self.unmount()
-
- def mount(self):
- for dirname, mp in self._mounts:
- if not os.path.exists(mp):
- os.mkdir(mp)
- self._runner.runcmd(['sudo', 'mount', '--bind', dirname, mp])
-
- def unmount(self):
- for dirname, mp in reversed(self._mounts):
- self._runner.runcmd(['sudo', 'umount', mp])
-
-
-class WorkerBase:
-
- def __init__(self, api, workspace, systree, post):
- self._api = api
- self._workspace = workspace
- self._systree = systree
- self._post = post
-
- def do(self, work):
- params = work.get('parameters', {})
- params_text = self.params64(params)
- argv = self.get_argv(work, params_text)
- mounts = []
- env = dict(os.environ)
- env.update({
- 'LC_ALL': 'C',
- 'DEBIAN_FRONTEND': 'noninteractive',
- })
- if self.where(work) == 'chroot':
- logging.debug('CHROOT REQUESTED')
- argv = ['sudo', 'chroot', self._workspace] + argv
- mounts = [
- ('/proc', os.path.join(self._workspace, 'proc')),
- ('/sys', os.path.join(self._workspace, 'sys')),
- ]
- elif self.where(work) == 'container':
- logging.debug('CONTAINER REQUESTED')
- bind = '{}:/workspace'.format(self._workspace)
- mp = '{}/workspace'.format(self._workspace)
- if not os.path.exists(mp):
- os.mkdir(mp)
- argv = [
- 'sudo', 'systemd-nspawn', '-D', self._systree, '--bind', bind,
- '--chdir', '/workspace',
- ] + argv
- else:
- logging.debug('HOST REQUESTED')
- runner = Runner(self._post)
- with Mounter(mounts, runner):
- return runner.runcmd(argv, cwd=self._workspace, env=env)
-
- def params64(self, params):
- as_json = json.dumps(params)
- as_bytes = as_json.encode('UTF-8')
- as_base64 = base64.b64encode(as_bytes)
- return as_base64.decode('UTF-8')
-
- def where(self, work):
+ logging.debug('Doing work: %r', work)
+ project_name = work['project']
step = work.get('step', {})
- return step.get('where')
-
- def get_argv(self, work, params_text):
- raise NotImplementedError()
-
-
-class ShellWorker(WorkerBase):
-
- def get_argv(self, work, params_text):
- step = work['step']
- code_snippet = step['shell']
- where = step.get('where', 'host')
- prefix = 'params() { echo -n "%s" | base64 -d; }\n' % params_text
- return ['bash', '-exuc', prefix + code_snippet]
-
-
-class PythonWorker(WorkerBase):
-
- def get_argv(self, work, params_text):
- prefix = (
- 'import base64, json\n'
- 'params = json.loads(base64.b64decode(\n'
- ' "{}").decode("utf8"))\n'
- ).format(params_text)
- code_snippet = work['step']['python']
- return ['python3', '-c', prefix + '\n' + code_snippet]
-
-
-class DebootstrapWorker(WorkerBase):
-
- def get_argv(self, work, params_text):
- step = work['step']
- params = work.get('parameters', {})
- if step['debootstrap'] == 'auto':
- suite = params['debian_codename']
- else:
- suite = step['debootstrap']
-
- return [
- 'sudo',
- 'debootstrap',
- suite,
- '.',
- step.get('mirror', 'http://deb.debian.org/debian'),
- ]
-
-
-class WorkspaceCleaner(WorkerBase):
-
- def get_argv(self, work, params_text):
- return ['sudo', 'find', '.', '-delete']
-
-
-class WorkspaceArchiver(WorkerBase):
-
- def do(self, work):
+ logging.debug('Doing work: step=%r', step)
params = work.get('parameters', {})
- blob_id = params.get('systree_name', 'noname')
- step = work['step']
- dirname = step['archive']
-
- if dirname == 'workspace':
- dirname = self._workspace
-
- runner = Runner(self._post)
-
- temp = self.archive(runner, dirname)
- if isinstance(temp, int):
- return temp
-
- result = self.upload(runner, temp, blob_id)
- os.remove(temp)
-
- return result
-
- def archive(self, runner, dirname):
- logging.info('Archiving %s', dirname)
- self.report(b'Archiving workspace\n')
-
- fd, filename = tempfile.mkstemp()
- os.close(fd)
-
- archive = ['sudo', 'tar', '-zcf', filename, '-C', dirname, '.']
- exit_code = runner.runcmd(archive)
- if exit_code != 0:
- os.remove(filename)
- return exit_code
-
- return filename
-
- def upload(self, runner, filename, blob_id):
- logging.info('Uploading %s', blob_id)
- self.report(b'Uploading workspace\n')
-
- url = self._api.bloburl(blob_id)
- headers = self._api.get_auth_headers()
- upload = ['curl', '-sk', '-T', filename]
- for name, value in headers.items():
- header = '{}: {}'.format(name, value)
- upload.extend(['-H', header])
- upload.append(url)
-
- exit_code = runner.runcmd(upload)
- if exit_code == 0:
- self.report(b'Uploaded tarball of workspace\n')
-
- return exit_code
-
- def execute_argv(self, *argvs, **kwargs):
- exit_code, _, _ = cliapp.runcmd_unchecked(
- *argvs,
- **kwargs,
- )
- return exit_code
-
- def report(self, data):
- self._post('stdout', data.decode('UTF-8'))
- return b''
-
- def get_argv(self, work, params_text):
- assert False
-
-
-class SystreePopulator(WorkerBase):
-
- systree_dir = '/var/lib/ick/systree'
-
- def do(self, work):
- step = work['step']
- systree_name = step.get('systree_name')
- if not systree_name:
- self.report(
- b'No systree_name field in action, no systree population\n')
- return 1
-
- self.make_directory_empty(self.systree_dir)
-
- self.report(b'Downloading and unpacking systree blob\n')
- return self.download_and_unpack_systree(systree_name, self.systree_dir)
-
- def make_directory_empty(self, dirname):
- return self.execute_argv(['sudo', 'find', '-delete'], cwd=dirname)
-
- def download_and_unpack_systree(self, systree_name, dirname):
- url = self._api.bloburl(systree_name)
- headers = self._api.get_auth_headers()
- download = ['curl', '-k']
- for name, value in headers.items():
- header = '{}: {}'.format(name, value)
- download.extend(['-H', header])
- download.append(url)
-
- unpack = ['sudo', 'tar', '-zxf', '-', '-C', dirname]
-
- return self.execute_argv(download, unpack)
-
- def execute_argv(self, *argvs, **kwargs):
- exit_code, _, _ = cliapp.runcmd_unchecked(
- *argvs,
- stdout_callback=self.report,
- stderr_callback=self.report,
- **kwargs,
- )
- return exit_code
-
- def report(self, data):
- self._post('stdout', data.decode('UTF-8'))
-
- def get_argv(self, work, params_text):
- assert False
+ logging.debug('Doing work: params=%r', params)
+ reporter = ick2.Reporter(self._api, work)
+ logging.debug('Doing work: reporter=%r', reporter)
+ af = ick2.ActionFactory(self._systree, self._workspace, reporter)
+ logging.debug('Doing work: af=%r', af)
+ af.set_token(self._api.get_token())
+ af.set_blob_url_func(self._api.get_blob_upload_url)
+ action = af.create_action(step, project_name)
+ logging.debug('Doing work: action=%r', action)
+ exit_code = action.execute(params, step)
+ logging.debug('Action finished: exit_code=%r', exit_code)
+ logging.debug('Finished work: %r', work)
if __name__ == '__main__':