diff options
Diffstat (limited to 'ick2')
-rw-r--r-- | ick2/__init__.py | 12 | ||||
-rw-r--r-- | ick2/apibase.py | 59 | ||||
-rw-r--r-- | ick2/controllerapi.py | 1 | ||||
-rw-r--r-- | ick2/controllerapi_tests.py | 42 | ||||
-rw-r--r-- | ick2/logapi.py | 4 | ||||
-rw-r--r-- | ick2/persistent.py | 153 | ||||
-rw-r--r-- | ick2/persistent_tests.py | 66 | ||||
-rw-r--r-- | ick2/projectapi.py | 106 | ||||
-rw-r--r-- | ick2/projectapi_tests.py | 47 | ||||
-rw-r--r-- | ick2/state.py | 204 | ||||
-rw-r--r-- | ick2/state_tests.py | 120 | ||||
-rw-r--r-- | ick2/trans.py | 76 | ||||
-rw-r--r-- | ick2/workapi.py | 254 | ||||
-rw-r--r-- | ick2/workapi_tests.py | 35 | ||||
-rw-r--r-- | ick2/workerapi.py | 3 |
15 files changed, 498 insertions, 684 deletions
diff --git a/ick2/__init__.py b/ick2/__init__.py index f212946..18615a4 100644 --- a/ick2/__init__.py +++ b/ick2/__init__.py @@ -15,15 +15,13 @@ from .version import __version__, __version_info__ from .logging import setup_logging, log -from .state import ( - ControllerState, +from .persistent import ( + FilePersistentState, NotFound, - WrongProjectStatus, - Workers, - Projects, - ProjectStatus, - Builds, + Resource, + resource_from_dict, ) +from .trans import TransactionalState from .exceptions import ( BadUpdate, ExistsAlready, diff --git a/ick2/apibase.py b/ick2/apibase.py index cb0d9f9..aeda00b 100644 --- a/ick2/apibase.py +++ b/ick2/apibase.py @@ -19,10 +19,8 @@ import ick2 class APIbase: def __init__(self, state): - self._state = state - - def get_state(self): - return self._state + assert state is None or isinstance(state, ick2.FilePersistentState) + self._trans = ick2.TransactionalState(state) def get_routes(self, path): return [ @@ -100,12 +98,6 @@ class APIbase: ick2.log.log( 'warning', msg_text='PUT Not found', kwargs=kwargs) return ick2.not_found(e) - except ick2.WrongProjectStatus as e: - ick2.log.log( - 'error', - msg_text='Wrong state for project', - exception=str(e)) - return ick2.bad_request(e) ick2.log.log('trace', msg_text='returned body', body=repr(body)) return ick2.OK(body) return wrapper @@ -149,22 +141,28 @@ class ResourceApiBase(APIbase): self._type_name = type_name def list(self, **kwargs): + resources = self._trans.get_resources(self._type_name) return { - self._type_name: self._state.get_resources(self._type_name), + self._type_name: [r.as_dict() for r in resources] } def show(self, name, **kwargs): - return self._state.get_resource(self._type_name, name) + return self._trans.get_resource(self._type_name, name).as_dict() def create(self, body, **kwargs): - resource = self.mangle_new_resource(body) - name = self.get_resource_name(resource) - try: - self._state.get_resource(self._type_name, name) - except ick2.NotFound: - return self._state.add_resource(self._type_name, name, resource) - else: - raise ick2.ExistsAlready(name) + ick2.log.log( + 'trace', msg_text='create resource', + resource_type=self._type_name, body=body, kwargs=kwargs) + + as_dict = self.mangle_new_resource(body) + rid = self.get_resource_name(as_dict) + if self._trans.has_resource(self._type_name, rid): + raise ick2.ExistsAlready(rid) + + with self._trans.new(self._type_name, rid) as resource: + resource.from_dict(as_dict) + + return as_dict def mangle_new_resource(self, resource): # pragma: no cover return resource @@ -173,17 +171,20 @@ class ResourceApiBase(APIbase): raise NotImplementedError() def update(self, body, name, **kwargs): - name = self.get_resource_name(body) - try: - old = self._state.get_resource(self._type_name, name) - except ick2.NotFound: - raise - else: - resource = self.mangle_updated_resource(old, body) - return self._state.update_resource(self._type_name, name, resource) + rid = self.get_resource_name(body) + if not self._trans.has_resource(self._type_name, rid): + raise ick2.NotFound() + + with self._trans.modify(self._type_name, rid) as resource: + as_dict = self.mangle_updated_resource(resource.as_dict(), body) + resource.from_dict(as_dict) + + return as_dict def mangle_updated_resource(self, old, new): # pragma: no cover return new def delete(self, name, **kwargs): - self._state.remove_resource(self._type_name, name) + if not self._trans.has_resource(self._type_name, name): + raise ick2.NotFound() + self._trans.remove_resource(self._type_name, name) diff --git a/ick2/controllerapi.py b/ick2/controllerapi.py index 9785384..c2e7eff 100644 --- a/ick2/controllerapi.py +++ b/ick2/controllerapi.py @@ -58,7 +58,6 @@ class ControllerAPI: routes = [] for path, api in self._apis.items(): - self._state.load_resources(path[1:]) routes.extend(api.get_routes(path)) ick2.log.log('info', msg_texg='Found routes', routes=routes) return routes diff --git a/ick2/controllerapi_tests.py b/ick2/controllerapi_tests.py deleted file mode 100644 index 89c553f..0000000 --- a/ick2/controllerapi_tests.py +++ /dev/null @@ -1,42 +0,0 @@ -# Copyright (C) 2017 Lars Wirzenius -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. -# -# You should have received a copy of the GNU Affero General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - - -import os -import shutil -import tempfile -import unittest - - -import ick2 - - -class ControllerAPITests(unittest.TestCase): - - def setUp(self): - self.tempdir = tempfile.mkdtemp() - self.statedir = os.path.join(self.tempdir, 'state/dir') - - def tearDown(self): - shutil.rmtree(self.tempdir) - - def create_api(self): - state = ick2.ControllerState() - state.set_state_directory(self.statedir) - api = ick2.ControllerAPI(state) - return api - - def test_sets_and_creates_state_directory(self): - self.create_api() - self.assertTrue(os.path.exists(self.statedir)) diff --git a/ick2/logapi.py b/ick2/logapi.py index a59cd13..c43e8a6 100644 --- a/ick2/logapi.py +++ b/ick2/logapi.py @@ -60,6 +60,6 @@ class LogAPI(ick2.ResourceApiBase): # pragma: no cover raise ick2.MethodNotAllowed('Updating builds directly is not allowed') def show(self, name, **kwargs): - log = self._state.get_resource('log', str(name)) - ick2.log.log('info', msg_text='Returning log', log=log) + log = self._trans.get_resource('log', str(name)) + ick2.log.log('info', msg_text='Returning log', log=log.as_dict()) return log['log'] diff --git a/ick2/persistent.py b/ick2/persistent.py new file mode 100644 index 0000000..4b216b3 --- /dev/null +++ b/ick2/persistent.py @@ -0,0 +1,153 @@ +# Copyright (C) 2018 Lars Wirzenius +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + + +import copy +import os +import urllib.parse + + +import yaml + + +import ick2 + + +class PersistentStateInterface: # pragma: no cover + + def get_resource_kinds(self): + raise NotImplementedError() + + def get_resource_ids(self, kind): + raise NotImplementedError() + + def has_resource(self, kind, rid): + raise NotImplementedError() + + def get_resource(self, kind, rid): + raise NotImplementedError() + + def get_resources(self, kind): + return [ + self.get_resource(kind, rid) + for rid in self.get_resource_ids(kind) + ] + + def write_resource(self, kind, rid, resource): + raise NotImplementedError() + + def remove_resource(self, kind, rid): + raise NotImplementedError() + + +class FilePersistentState(PersistentStateInterface): + + def __init__(self): + self._dir = None + + def get_directory(self): + return self._dir + + def set_directory(self, dirname): + self._dir = dirname + + def _safe(self, name): + return urllib.parse.quote(name, safe='') + + def _unsafe(self, safe): + return urllib.parse.unquote(safe) + + def _unsafe_list(self, safe_names): + return [self._unsafe(safe) for safe in safe_names] + + def _dirname(self, kind): + return os.path.join(self._dir, self._safe(kind)) + + def _filename(self, kind, rid): + dirname = self._dirname(kind) + return os.path.join(dirname, self._safe(rid)) + + def get_resource_kinds(self): + return self._unsafe_list(os.listdir(self._dir)) + + def has_resource(self, kind, rid): + filename = self._filename(kind, rid) + return os.path.exists(filename) + + def get_resource_ids(self, kind): + dirname = self._dirname(kind) + if os.path.exists(dirname): + return self._unsafe_list(os.listdir(dirname)) + return [] + + def get_resource(self, kind, rid): + filename = self._filename(kind, rid) + if not os.path.exists(filename): + raise ick2.NotFound() + with open(filename, 'r') as f: + as_dict = yaml.safe_load(f) + return resource_from_dict(as_dict) + + def write_resource(self, kind, rid, resource): + dirname = self._dirname(kind) + if not os.path.exists(dirname): + os.makedirs(dirname) + + filename = self._filename(kind, rid) + with open(filename, 'w') as f: + yaml.safe_dump(resource.as_dict(), stream=f) + + def remove_resource(self, kind, rid): + filename = self._filename(kind, rid) + os.remove(filename) + + +class NotFound(Exception): + + def __init__(self): + super().__init__('Resource not found') + + +class Resource: # pragma: no cover + + def __init__(self, as_dict=None): + self._dict = copy.deepcopy(as_dict or {}) + + def as_dict(self): + return copy.deepcopy(self._dict) + + def __getitem__(self, key): + return self._dict[key] + + def __setitem__(self, key, value): + self._dict[key] = value + + def __contains__(self, key): + return key in self._dict + + def __len__(self): + return len(self._dict) + + def get(self, key, default=None): + return self._dict.get(key, default) + + def from_dict(self, as_dict): + self._dict.clear() + for key in as_dict: + self[key] = as_dict[key] + + +def resource_from_dict(as_dict): # pragma: no cover + return Resource(as_dict) diff --git a/ick2/persistent_tests.py b/ick2/persistent_tests.py new file mode 100644 index 0000000..8acb141 --- /dev/null +++ b/ick2/persistent_tests.py @@ -0,0 +1,66 @@ +# Copyright (C) 2018 Lars Wirzenius +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + + +import shutil +import tempfile +import unittest + + +import ick2 + + +class FilePersistentStateTests(unittest.TestCase): + + def setUp(self): + self.tempdir = tempfile.mkdtemp() + self.state = ick2.FilePersistentState() + self.state.set_directory(self.tempdir) + + def tearDown(self): + shutil.rmtree(self.tempdir) + + def test_returns_dirname(self): + self.assertEqual(self.state.get_directory(), self.tempdir) + + def test_has_no_resource_kinds_initially(self): + self.assertEqual(self.state.get_resource_kinds(), []) + + def test_has_no_resources_initially(self): + self.assertEqual(self.state.get_resource_ids('silly'), []) + + def test_has_no_resource_initially(self): + with self.assertRaises(ick2.NotFound): + self.state.get_resource('silly', '#1') + + def test_creates_resource(self): + as_dict = {'foo': 'bar'} + r = ick2.resource_from_dict(as_dict) + self.state.write_resource('silly', '#1', r) + self.assertTrue(self.state.has_resource('silly', '#1')) + self.assertEqual(self.state.get_resource_kinds(), ['silly']) + self.assertEqual(self.state.get_resource_ids('silly'), ['#1']) + + r2 = self.state.get_resource('silly', '#1') + self.assertTrue(isinstance(r2, ick2.Resource)) + self.assertEqual(r.as_dict(), r2.as_dict()) + + def test_removes_resource(self): + as_dict = {'foo': 'bar'} + r = ick2.resource_from_dict(as_dict) + self.state.write_resource('silly', '#1', r) + self.state.remove_resource('silly', '#1') + self.assertFalse(self.state.has_resource('silly', '#1')) + self.assertEqual(self.state.get_resource_ids('silly'), []) diff --git a/ick2/projectapi.py b/ick2/projectapi.py index a1a4ac6..22a5bb6 100644 --- a/ick2/projectapi.py +++ b/ick2/projectapi.py @@ -20,7 +20,6 @@ class ProjectAPI(ick2.ResourceApiBase): def __init__(self, state): super().__init__('projects', state) - self._ps = ick2.ProjectStatus(self.get_state()) def mangle_new_resource(self, resource): new = dict(resource) @@ -45,21 +44,6 @@ class ProjectAPI(ick2.ResourceApiBase): trigger_path = '{}/<project>/+trigger'.format(path) return [ { - 'method': 'GET', - 'path': all_statuses_path, - 'callback': self.GET(self.get_all_statuses), - }, - { - 'method': 'GET', - 'path': status_path, - 'callback': self.GET(self.get_status), - }, - { - 'method': 'PUT', - 'path': status_path, - 'callback': self.PUT(self.set_status_callback), - }, - { 'needs-authorization': False, 'method': 'GET', 'path': trigger_path, @@ -67,50 +51,48 @@ class ProjectAPI(ick2.ResourceApiBase): }, ] - def get_all_statuses(self, **kwargs): # pragma: no cover - all_projects = self.list() - projects = all_projects['projects'] - ick2.log.log( - 'trace', msg_text='get_all_statuses', projects=projects) - return { - project['project']: self.get_status(project['project']) - for project in projects - } - - def get_status(self, project, **kwargs): - ick2.log.log('trace', msg_text='get_status', project=project) - _ = self._state.get_resource(self._type_name, project) - ps = self._ps.get_instance(project) - return { - 'status': ps.get('status', 'idle'), - } - - def set_status_callback(self, body, project, **kwargs): # pragma: no cover - return self.set_status(project, body['status']) - - def set_status(self, project, status): - ick2.log.log( - 'trace', msg_text='Setting project status', - project=project, status=status) - - allowed_changes = { - 'idle': 'triggered', - 'triggered': 'building', - 'building': 'idle', - } - - p = self._state.get_resource(self._type_name, project) - ick2.log.log('trace', msg_text='Found project', project=p) - - ps = self._ps.get_instance(project) - old_status = ps.get('status', 'idle') - if allowed_changes[old_status] != status: - raise ick2.WrongProjectStatus(status) - ps['status'] = status - self._ps.update_instance(project, ps) - return {'status': status} - - # This needs to go away as it is not protected. Once an IDP is - # added. def trigger_project(self, project, **kwargs): # pragma: no cover - return self.set_status(project, 'triggered') + with self._trans.modify('projects', project) as p: + self._start_build(p) + return {'status': 'triggered'} + + def _start_build(self, project): # pragma: no cover + build_no = self._pick_build_number(project) + build_id = '{}/{}'.format(project['project'], build_no) + ick2.log.log('info', msg_text='Starting new build', build_id=build_id) + + with self._trans.new('builds', build_id) as build: + parameters = project.get('parameters', {}) + create_workspace = { + 'action': 'create_workspace', + 'where': 'host', + } + actions = [create_workspace] + self._get_actions(project) + build.from_dict({ + 'build_id': build_id, + 'build_number': build_no, + 'log': '/logs/{}'.format(build_id), + 'worker': None, + 'project': project['project'], + 'parameters': parameters, + 'status': 'triggered', + 'actions': actions, + 'current_action': 0, + }) + return build_id, build_no + + def _pick_build_number(self, project): # pragma: no cover + old_build_no = project.get('next_build_id') + build_no = (old_build_no or 0) + 1 + project['next_build_id'] = build_no + ick2.log.log( + 'info', msg_text='chose build number', + old_build_no=old_build_no, build_no=build_no) + return build_no + + def _get_actions(self, project): # pragma: no cover + actions = [] + for pipeline_name in project.get('pipelines', []): + pipeline = self._trans.get_resource('pipelines', pipeline_name) + actions.extend(list(pipeline['actions'])) + return actions diff --git a/ick2/projectapi_tests.py b/ick2/projectapi_tests.py index 2ef1dbe..b6ec9e9 100644 --- a/ick2/projectapi_tests.py +++ b/ick2/projectapi_tests.py @@ -19,9 +19,6 @@ import tempfile import unittest -import yaml - - import ick2 @@ -30,8 +27,8 @@ class ProjectAPITests(unittest.TestCase): def setUp(self): self.tempdir = tempfile.mkdtemp() self.statedir = os.path.join(self.tempdir, 'state/dir') - self.state = ick2.ControllerState() - self.state.set_state_directory(self.statedir) + self.state = ick2.FilePersistentState() + self.state.set_directory(self.statedir) def tearDown(self): shutil.rmtree(self.tempdir) @@ -59,15 +56,6 @@ class ProjectAPITests(unittest.TestCase): project['next_build_id'] = None self.assertEqual(new, project) self.assertEqual(api.list(), {'projects': [new]}) - dirname = os.path.join(self.statedir, 'projects') - filename = os.listdir(dirname)[0] - obj = yaml.safe_load(open(os.path.join(dirname, filename))) - self.assertEqual(api.get_status('foo'), {'status': 'idle'}) - - def test_raises_error_when_getting_missing_project_status(self): - api = self.create_api() - with self.assertRaises(ick2.NotFound): - api.get_status('does-not-exist') def test_loads_projects_from_state_directory(self): project = { @@ -121,34 +109,3 @@ class ProjectAPITests(unittest.TestCase): api = self.create_api() with self.assertRaises(ick2.NotFound): api.delete('foo') - - def test_updates_project_status(self): - project = { - 'project': 'foo', - } - api = self.create_api() - api.create(project) - self.assertEqual(api.get_status('foo'), {'status': 'idle'}) - - with self.assertRaises(ick2.WrongProjectStatus): - api.set_status('foo', 'build') - - api.set_status('foo', 'triggered') - self.assertEqual(api.get_status('foo'), {'status': 'triggered'}) - - with self.assertRaises(ick2.WrongProjectStatus): - api.set_status('foo', 'idle') - - api.set_status('foo', 'building') - self.assertEqual(api.get_status('foo'), {'status': 'building'}) - - with self.assertRaises(ick2.WrongProjectStatus): - api.set_status('foo', 'triggered') - - api.set_status('foo', 'idle') - self.assertEqual(api.get_status('foo'), {'status': 'idle'}) - - def test_raises_error_updating_status_of_missing_pipeline(self): - api = self.create_api() - with self.assertRaises(ick2.NotFound): - api.set_status('does-not-exist', 'triggered') diff --git a/ick2/state.py b/ick2/state.py deleted file mode 100644 index 12b38ba..0000000 --- a/ick2/state.py +++ /dev/null @@ -1,204 +0,0 @@ -# Copyright (C) 2017-2018 Lars Wirzenius -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. -# -# You should have received a copy of the GNU Affero General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - - -import base64 -import glob -import os - - -import yaml - - -import ick2 - - -class ControllerState: - - def __init__(self): - self._statedir = None - - def get_state_directory(self): - return self._statedir - - def set_state_directory(self, dirname): - self._statedir = dirname - if not os.path.exists(self._statedir): - os.makedirs(self._statedir) - - def get_resource_directory(self, type_name): - return os.path.join(self.get_state_directory(), type_name) - - def get_resource_filename(self, type_name, resource_name): - dirname = self.get_resource_directory(type_name) - basename = base64.urlsafe_b64encode( - resource_name.encode()).decode('ascii') - return os.path.join(dirname, basename + '.yaml') - - def load_resources(self, type_name): - assert self._statedir is not None - resources = [] - dirname = self.get_resource_directory(type_name) - for filename in glob.glob(dirname + '/*.yaml'): - obj = self.load_resource(filename) - resources.append(obj) - return resources - - def load_resource(self, filename): - with open(filename, 'r') as f: - return yaml.safe_load(f) - - def get_resources(self, type_name): - return self.load_resources(type_name) - - def get_resource(self, type_name, resource_name): - filename = self.get_resource_filename(type_name, resource_name) - if os.path.exists(filename): - result = self.load_resource(filename) - ick2.log.log( - 'debug', msg_text='get_resource called', - type_name=type_name, resource_name=resource_name, - filename=filename, result=result) - return result - ick2.log.log( - 'warning', msg_text='Cannot find resource', - type_name=type_name, resource_name=resource_name) - raise NotFound() - - def add_resource(self, type_name, resource_name, resource): - filename = self.get_resource_filename(type_name, resource_name) - dirname = os.path.dirname(filename) - if not os.path.exists(dirname): - os.makedirs(dirname) - with open(filename, 'w') as f: - yaml.safe_dump(resource, stream=f) - return resource - - def update_resource(self, type_name, resource_name, body): - filename = self.get_resource_filename(type_name, resource_name) - with open(filename, 'w') as f: - yaml.safe_dump(body, stream=f) - return body - - def remove_resource(self, type_name, resource_name): - filename = self.get_resource_filename(type_name, resource_name) - if not os.path.exists(filename): - raise NotFound() - os.remove(filename) - - -class NotFound(Exception): - - def __init__(self): - super().__init__('Resource not found') - - -class WrongProjectStatus(Exception): # pragma: no cover - - def __init__(self, new_state): - super().__init__('Cannot set project state to {}'.format(new_state)) - - -class ResourceStore: # pragma: no cover - - def __init__(self, state, category, name_field): - self._state = state - self._category = category - self._name_field = name_field - - def get_resource_name(self, name): - return str(name) - - def list(self): - return self._state.get_resources(self._category) - - def get(self, name): - name = self.get_resource_name(name) - try: - return self._state.get_resource(self._category, name) - except ick2.NotFound: - return { - self._name_field: name, - } - - def add(self, name, resource): - name = self.get_resource_name(name) - self._state.add_resource(self._category, name, resource) - - def update(self, resource): - name = self.get_resource_name(resource[self._name_field]) - try: - self._state.get_resource(self._category, name) - except ick2.NotFound: - raise ick2.NotFound() - else: - self._state.update_resource(self._category, name, resource) - - -class Workers(ResourceStore): # pragma: no cover - - def __init__(self, state): - super().__init__(state, 'workers', 'worker') - - def get_worker(self, name): - return self.get(name) - - def update_worker(self, worker): - self.update(worker) - - -class Projects(ResourceStore): # pragma: no cover - - def __init__(self, state): - super().__init__(state, 'projects', 'project') - - def get_projects(self): - return self.list() - - def update_project(self, project): - self.update(project) - - -class Builds(ResourceStore): # pragma: no cover - - def __init__(self, state): - super().__init__(state, 'builds', 'build_id') - - def get_builds(self): - return self.list() - - def update_build(self, build): - self.update(build) - - -class ProjectStatus(ResourceStore): # pragma: no cover - - def __init__(self, state): - super().__init__(state, 'project_status', 'project') - - def _name(self, project_name): - return project_name - - def get_instance(self, project_name): - name = self._name(project_name) - return self.get(name) - - def add_instance(self, project_name, instance): - name = self._name(project_name) - self.add(name, instance) - - def update_instance(self, project_name, instance): - name = self._name(project_name) - self.add(name, instance) diff --git a/ick2/state_tests.py b/ick2/state_tests.py deleted file mode 100644 index 146ed6e..0000000 --- a/ick2/state_tests.py +++ /dev/null @@ -1,120 +0,0 @@ -# Copyright (C) 2017 Lars Wirzenius -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. -# -# You should have received a copy of the GNU Affero General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - - -import os -import shutil -import tempfile -import unittest - - -import ick2 - - -class StateTestsBase: - - def setUp(self): - self.tempdir = tempfile.mkdtemp() - self.statedir = os.path.join(self.tempdir, 'state/dir') - - def tearDown(self): - shutil.rmtree(self.tempdir) - - def create_state(self): - state = ick2.ControllerState() - state.set_state_directory(self.statedir) - return state - - -class ControllerStateTests(StateTestsBase, unittest.TestCase): - - def test_has_no_state_directory_initially(self): - state = ick2.ControllerState() - self.assertTrue(state.get_state_directory() is None) - - def test_sets_and_creates_state_directory(self): - state = self.create_state() - statedir = state.get_state_directory() - self.assertEqual(statedir, self.statedir) - self.assertTrue(os.path.exists(statedir)) - - -class ResourceStateTests(StateTestsBase, unittest.TestCase): - - def test_has_not_resources_initially(self): - state = self.create_state() - self.assertEqual(state.get_resources('project'), []) - - def test_creates_resource(self): - project = { - 'project': 'foo', - 'shell_steps': ['build'], - } - state = self.create_state() - state.add_resource('projects', project['project'], project) - self.assertEqual(state.get_resources('projects'), [project]) - self.assertTrue( - os.path.exists(state.get_resource_filename('projects', 'foo'))) - - def test_loads_resource_from_state_directory(self): - project = { - 'project': 'foo', - 'shell_steps': ['build'], - } - state = self.create_state() - state.add_resource('projects', project['project'], project) - - state2 = self.create_state() - state2.load_resources('projects') - self.assertEqual(state2.get_resources('projects'), [project]) - - def test_gets_named_resource(self): - project = { - 'project': 'foo', - 'shell_steps': ['build'], - } - state = self.create_state() - state.add_resource('projects', project['project'], project) - self.assertEqual(state.get_resource('projects', 'foo'), project) - - def test_updates_named_resource(self): - project_v1 = { - 'project': 'foo', - 'shell_steps': ['build'], - } - project_v2 = dict(project_v1) - project_v2['shell_steps'] = ['build it using magic'] - state = self.create_state() - state.add_resource('projects', 'foo', project_v1) - updated = state.update_resource('projects', 'foo', project_v2) - self.assertEqual(updated, project_v2) - self.assertEqual(state.get_resource('projects', 'foo'), project_v2) - - def test_deletes_named_resource(self): - project = { - 'project': 'foo', - 'shell_steps': ['build'], - } - state = self.create_state() - state.add_resource('projects', project['project'], project) - state.remove_resource('projects', 'foo') - self.assertEqual(state.get_resources('projects'), []) - with self.assertRaises(ick2.NotFound): - state.get_resource('projects', 'foo') - - def test_raises_error_deleting_missing_resource(self): - state = self.create_state() - with self.assertRaises(ick2.NotFound): - state.remove_resource('projects', 'foo') diff --git a/ick2/trans.py b/ick2/trans.py new file mode 100644 index 0000000..a66dd41 --- /dev/null +++ b/ick2/trans.py @@ -0,0 +1,76 @@ +# Copyright (C) 2018 Lars Wirzenius +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + + +import ick2 + + +class TransactionalResource: + + def __init__(self, state, kind, rid): + self.state = state + self.kind = kind + self.rid = rid + if state.has_resource(kind, rid): + self.resource = state.get_resource(kind, rid) + else: + self.resource = ick2.resource_from_dict({}) + + methods = [ + 'as_dict', + '__getitem__', + '__setitem__', + '__contains__', + '__len__', + ] + for method in methods: + setattr(self, method, getattr(self.resource, method)) + + def __enter__(self): + return self.resource + + def __exit__(self, exc_type, value, traceback): + if exc_type is None: + self.state.write_resource(self.kind, self.rid, self.resource) + + +class TransactionalState: + + def __init__(self, state): + self.state = state + + def new(self, kind, rid): + return TransactionalResource(self.state, kind, rid) + + def modify(self, kind, rid): + return TransactionalResource(self.state, kind, rid) + + def get_resource_kinds(self): + return self.state.get_resource_kinds() + + def get_resource_ids(self, kind): + return self.state.get_resource_ids(kind) + + def has_resource(self, kind, rid): + return self.state.has_resource(kind, rid) + + def get_resource(self, kind, rid): + return self.state.get_resource(kind, rid) + + def get_resources(self, kind): + return self.state.get_resources(kind) + + def remove_resource(self, kind, rid): + self.state.remove_resource(kind, rid) diff --git a/ick2/workapi.py b/ick2/workapi.py index 32c988e..d254515 100644 --- a/ick2/workapi.py +++ b/ick2/workapi.py @@ -20,11 +20,7 @@ class WorkAPI(ick2.APIbase): def __init__(self, state): super().__init__(state) - self._workers = ick2.Workers(state) - self._projects = ick2.Projects(state) - self._ps = ick2.ProjectStatus(state) - self._builds = ick2.Builds(state) - self._type_name = 'work' + self._trans = ick2.TransactionalState(state) def get_routes(self, path): # pragma: no cover return [ @@ -41,38 +37,40 @@ class WorkAPI(ick2.APIbase): ] def get_work(self, **kwargs): - worker = self._get_client_id(**kwargs) - worker_state = self._workers.get_worker(worker) - if not worker_state.get('doing'): - project = self._pick_triggered_project() - if project is None: - doing = {} - else: - self._set_project_status(project['project'], 'building') - - build_id, build_no = self._start_build(project, worker) + worker_id = self._get_client_id(**kwargs) + with self._trans.modify('workers', worker_id) as worker: + doing = worker.get('doing') + if doing: + return doing + + build_id = self._pick_build(worker_id) + if build_id is None: + return {} + + with self._trans.modify('builds', build_id) as build: + build['status'] = 'building' + build['worker'] = worker_id + self._start_log(build_id) - build = self._get_build(build_id) + actions = build['actions'] current_action = build['current_action'] - doing = { 'build_id': build_id, - 'build_number': build_no, - 'worker': worker, - 'project': project['project'], - 'parameters': project.get('parameters', {}), + 'build_number': build['build_number'], + 'worker': worker_id, + 'project': build['project'], + 'parameters': build['parameters'], 'step': actions[current_action], - 'log': '/logs/{}'.format(build_id), + 'log': build['log'], } - worker_state = { - 'worker': worker, - 'doing': doing, - } - self._workers.update_worker(worker_state) + worker.from_dict({ + 'worker': worker_id, + 'doing': doing, + }) - return worker_state['doing'] + return worker['doing'] def _get_client_id(self, **kwargs): claims = kwargs.get('claims', {}) @@ -81,147 +79,103 @@ class WorkAPI(ick2.APIbase): raise ick2.ClientIdMissing() return client_id - def _pick_build_number(self, project): - old_build_no = project.get('next_build_id') - build_no = (old_build_no or 0) + 1 - project['next_build_id'] = build_no - self._projects.update_project(project) - ick2.log.log( - 'info', msg_text='chose build number', - old_build_no=old_build_no, build_no=build_no) - return build_no - - def _pick_triggered_project(self): - projects = self._projects.get_projects() - for project in projects: - ps = self._ps.get_instance(project['project']) - if ps.get('status') == 'triggered': - return project - return None + def _pick_build(self, worker): + def on_worker(build): + return build.get('worker') == worker + + def status(build): + return build.get('status') - def _set_project_status(self, project_name, status): - ps = self._ps.get_instance(project_name) - ps['status'] = status - self._ps.update_instance(project_name, ps) - - def _start_build(self, project, worker): - build_no = self._pick_build_number(project) - build_id = '{}/{}'.format(project['project'], build_no) - - ick2.log.log( - 'info', msg_text='Starting new build', build_id=build_id, - build_no=build_no) - parameters = project.get('parameters', {}) - create_workspace = { - 'action': 'create_workspace', - 'where': 'host', - } - actions = [create_workspace] + self._get_actions(project) - build = { - 'build_id': build_id, - 'build_number': build_no, - 'log': '/logs/{}'.format(build_id), - 'worker': worker, - 'project': project['project'], - 'parameters': parameters, - 'status': 'building', - 'actions': actions, - 'current_action': 0, - } - self._builds.add(build_id, build) - return build_id, build_no - - def _get_actions(self, project): - actions = [] - for pipeline_name in project['pipelines']: - pipeline = self._state.get_resource('pipelines', pipeline_name) - actions.extend(list(pipeline['actions'])) - return actions - - def _get_build(self, build_id): - return self._builds.get(build_id) + def is_building(build): + return status(build) == 'building' + + def is_triggered(build): + return status(build) == 'triggered' + + builds = self._trans.get_resources('builds') + return (self._find_build(builds, on_worker, is_building) or + self._find_build(builds, is_triggered)) + + def _find_build(self, builds, *preds): + for build in builds: + if all(pred(build) for pred in preds): + return build['build_id'] + return None def _start_log(self, build_id): ick2.log.log('info', msg_text='Starting new log', build_id=build_id) - log = { - 'build_id': build_id, - 'log': '', - } - self._state.add_resource('log', str(build_id), log) + with self._trans.new('log', build_id) as r: + r.from_dict({ + 'build_id': build_id, + 'log': '', + }) def update_work(self, update, **kwargs): - if 'worker' not in update: # pragma: no cover - raise ick2.BadUpdate('no worker specified') - - worker_state = self._workers.get_worker(update['worker']) - doing = worker_state.get('doing', {}) - self._check_work_update(doing, update) - - project_name = update['project'] - - self._append_to_build_log(update) - - exit_code = update.get('exit_code') - if exit_code == 0: - build_id = doing['build_id'] - build = self._get_build(build_id) - actions = build['actions'] - current_action = build['current_action'] - if current_action + 1 >= len(actions): - self._set_project_status(project_name, 'idle') - doing = {} - self._finish_build(update) - else: - index = current_action + 1 - build['current_action'] = index - self._update_build(build) - doing['step'] = actions[index] - - worker_state = { - 'worker': update['worker'], - 'doing': doing, - } - self._workers.update_worker(worker_state) - elif exit_code is not None: - assert isinstance(exit_code, int) - assert exit_code != 0 - self._set_project_status(project_name, 'idle') - self._finish_build(update) - - worker_state = { - 'worker': update['worker'], - 'doing': {}, - } - self._workers.update_worker(worker_state) + try: + worker_id = update['worker'] + build_id = update['build_id'] + project_name = update['project'] + exit_code = update.get('exit_code') + except KeyError as e: # pragma: no cover + raise ick2.BadUpdate(str(e)) + + with self._trans.modify('workers', worker_id) as worker: + with self._trans.modify('builds', build_id) as build: + doing = worker.get('doing', {}) + self._check_work_update(doing, update) + self._append_to_build_log(update) + + if exit_code is not None: + if exit_code == 0: + action = self._move_to_next_action(build) + if action is None: + doing = {} + self._finish_build(build, exit_code) + else: + doing['step'] = action + + worker.from_dict({ + 'worker': worker_id, + 'doing': doing, + }) + elif exit_code is not None: + self._finish_build(build, exit_code) + worker.from_dict({ + 'worker': worker_id, + 'doing': {}, + }) def _check_work_update(self, doing, update): # pragma: no cover must_match = ['worker', 'project', 'build_id'] for name in must_match: if name not in update: raise ick2.BadUpdate('{} not specified'.format(name)) - if doing.get(name) != update[name]: + if doing.get(name) is not None and doing.get(name) != update[name]: raise ick2.BadUpdate( '{} differs from current work: {} vs {}'.format( name, doing.get(name), update[name])) def _append_to_build_log(self, update): build_id = update['build_id'] - log = self._state.get_resource('log', str(build_id)) - for kind in ['stdout', 'stderr']: - text = update.get(kind, '') - if text is not None: - log['log'] += text - self._state.update_resource('log', str(build_id), log) - - def _update_build(self, build): - self._builds.update_build(build) - - def _finish_build(self, update): - build_id = update['build_id'] - build = self._get_build(build_id) - build['status'] = update['exit_code'] + with self._trans.modify('log', build_id) as log: + for stream in ['stdout', 'stderr']: + text = update.get(stream, '') + if text is not None: + log['log'] += text + + def _move_to_next_action(self, build): + actions = build['actions'] + current_action = build['current_action'] + if current_action + 1 >= len(actions): + return None + + index = current_action + 1 + build['current_action'] = index + return actions[index] + + def _finish_build(self, build, exit_code): + build['status'] = exit_code build['current_action'] = None - self._update_build(build) def create(self, body, **kwargs): # pragma: no cover pass diff --git a/ick2/workapi_tests.py b/ick2/workapi_tests.py index 1c08a61..05e91a0 100644 --- a/ick2/workapi_tests.py +++ b/ick2/workapi_tests.py @@ -27,8 +27,8 @@ class WorkAPITests(unittest.TestCase): def setUp(self): self.tempdir = tempfile.mkdtemp() self.statedir = os.path.join(self.tempdir, 'state/dir') - self.state = ick2.ControllerState() - self.state.set_state_directory(self.statedir) + self.state = ick2.FilePersistentState() + self.state.set_directory(self.statedir) def tearDown(self): shutil.rmtree(self.tempdir) @@ -58,7 +58,7 @@ class WorkAPITests(unittest.TestCase): def create_worker_api(self): worker = { - 'this': 'that', + 'doing': {}, } claims = { 'aud': 'asterix', @@ -70,16 +70,16 @@ class WorkAPITests(unittest.TestCase): def create_work_api(self): return ick2.WorkAPI(self.state) - def test_worker_gets_no_work_when_no_pipeline_is_triggered(self): + def test_worker_gets_no_work_when_no_builds_have_been_triggered(self): self.create_project_api() self.create_worker_api() work = self.create_work_api() claims = {'aud': 'asterix'} self.assertEqual(work.get_work(claims=claims), {}) - def test_worker_gets_work_when_a_pipeline_is_triggered(self): + def test_worker_gets_work_when_a_build_has_been_triggered(self): projects = self.create_project_api() - projects.set_status('foo', 'triggered') + projects.trigger_project('foo') self.create_worker_api() work = self.create_work_api() expected = { @@ -105,10 +105,16 @@ class WorkAPITests(unittest.TestCase): def test_worker_manager_posts_work_updates(self): projects = self.create_project_api() - projects.set_status('foo', 'triggered') self.create_worker_api() work = self.create_work_api() + # No builds have been triggered, nothing to do. + claims = {'aud': 'asterix'} + self.assertEqual(work.get_work(claims=claims), {}) + + # Trigger a build. + projects.trigger_project('foo') + # Ask for some work. expected = { 'build_id': 'foo/1', @@ -149,8 +155,9 @@ class WorkAPITests(unittest.TestCase): work.update_work(done) # We should get the next step now. + got = work.get_work(claims=claims) expected['step'] = {'shell': 'step-1', 'where': 'host'} - self.assertEqual(work.get_work(claims=claims), expected) + self.assertEqual(got, expected) # Finish the step. done['exit_code'] = 0 @@ -167,14 +174,9 @@ class WorkAPITests(unittest.TestCase): # We now get nothing further to do. self.assertEqual(work.get_work(claims=claims), {}) - # An pipeline status has changed. - self.assertEqual( - projects.get_status('foo'), - {'status': 'idle'}) - def test_worker_manager_posts_failure(self): projects = self.create_project_api() - projects.set_status('foo', 'triggered') + projects.trigger_project('foo') self.create_worker_api() work = self.create_work_api() @@ -211,8 +213,3 @@ class WorkAPITests(unittest.TestCase): # Ask for work again. claims = {'aud': 'asterix'} self.assertEqual(work.get_work(claims=claims), {}) - - # And project status has changed. - self.assertEqual( - projects.get_status('foo'), - {'status': 'idle'}) diff --git a/ick2/workerapi.py b/ick2/workerapi.py index 2af240e..d4b508c 100644 --- a/ick2/workerapi.py +++ b/ick2/workerapi.py @@ -32,9 +32,6 @@ class WorkerAPI(ick2.ResourceApiBase): # pragma: no cover return client_id def create(self, body, **kwargs): - ick2.log.log( - 'trace', msg_text='create worker', body=body, kwargs=kwargs) - client_id = self._get_client_id(**kwargs) body['worker'] = client_id return super().create(body, **kwargs) |