summaryrefslogtreecommitdiff
path: root/ick2/workapi.py
diff options
context:
space:
mode:
authorLars Wirzenius <liw@exolobe1>2018-05-13 15:30:23 +0300
committerLars Wirzenius <liw@liw.fi>2018-05-17 21:44:59 +0300
commitb11d31ef23c5dfee6bfa54afbec47fc8b8bab7b1 (patch)
tree2e6b085f8fb023d53c8ac20a97aef2c7d1c11d4b /ick2/workapi.py
parent531dd2c50bfdfcf50bb37f57cf9fc2b69787adcf (diff)
downloadick2-b11d31ef23c5dfee6bfa54afbec47fc8b8bab7b1.tar.gz
Change: how controller stores persistent data
Replace old State class with new FilePersistentState and TransactionalState classes. Use new Resource class instead of raw dicts. Use context managers for creating, updating resources, to avoid mistakes from accidentally not saving changes. Overall persistence should now be rather simpler. This should open up a possibility for changing the controller to insert more actions into the build graph, to trigger notifcations via the workers.
Diffstat (limited to 'ick2/workapi.py')
-rw-r--r--ick2/workapi.py254
1 files changed, 104 insertions, 150 deletions
diff --git a/ick2/workapi.py b/ick2/workapi.py
index 32c988e..d254515 100644
--- a/ick2/workapi.py
+++ b/ick2/workapi.py
@@ -20,11 +20,7 @@ class WorkAPI(ick2.APIbase):
def __init__(self, state):
super().__init__(state)
- self._workers = ick2.Workers(state)
- self._projects = ick2.Projects(state)
- self._ps = ick2.ProjectStatus(state)
- self._builds = ick2.Builds(state)
- self._type_name = 'work'
+ self._trans = ick2.TransactionalState(state)
def get_routes(self, path): # pragma: no cover
return [
@@ -41,38 +37,40 @@ class WorkAPI(ick2.APIbase):
]
def get_work(self, **kwargs):
- worker = self._get_client_id(**kwargs)
- worker_state = self._workers.get_worker(worker)
- if not worker_state.get('doing'):
- project = self._pick_triggered_project()
- if project is None:
- doing = {}
- else:
- self._set_project_status(project['project'], 'building')
-
- build_id, build_no = self._start_build(project, worker)
+ worker_id = self._get_client_id(**kwargs)
+ with self._trans.modify('workers', worker_id) as worker:
+ doing = worker.get('doing')
+ if doing:
+ return doing
+
+ build_id = self._pick_build(worker_id)
+ if build_id is None:
+ return {}
+
+ with self._trans.modify('builds', build_id) as build:
+ build['status'] = 'building'
+ build['worker'] = worker_id
+
self._start_log(build_id)
- build = self._get_build(build_id)
+
actions = build['actions']
current_action = build['current_action']
-
doing = {
'build_id': build_id,
- 'build_number': build_no,
- 'worker': worker,
- 'project': project['project'],
- 'parameters': project.get('parameters', {}),
+ 'build_number': build['build_number'],
+ 'worker': worker_id,
+ 'project': build['project'],
+ 'parameters': build['parameters'],
'step': actions[current_action],
- 'log': '/logs/{}'.format(build_id),
+ 'log': build['log'],
}
- worker_state = {
- 'worker': worker,
- 'doing': doing,
- }
- self._workers.update_worker(worker_state)
+ worker.from_dict({
+ 'worker': worker_id,
+ 'doing': doing,
+ })
- return worker_state['doing']
+ return worker['doing']
def _get_client_id(self, **kwargs):
claims = kwargs.get('claims', {})
@@ -81,147 +79,103 @@ class WorkAPI(ick2.APIbase):
raise ick2.ClientIdMissing()
return client_id
- def _pick_build_number(self, project):
- old_build_no = project.get('next_build_id')
- build_no = (old_build_no or 0) + 1
- project['next_build_id'] = build_no
- self._projects.update_project(project)
- ick2.log.log(
- 'info', msg_text='chose build number',
- old_build_no=old_build_no, build_no=build_no)
- return build_no
-
- def _pick_triggered_project(self):
- projects = self._projects.get_projects()
- for project in projects:
- ps = self._ps.get_instance(project['project'])
- if ps.get('status') == 'triggered':
- return project
- return None
+ def _pick_build(self, worker):
+ def on_worker(build):
+ return build.get('worker') == worker
+
+ def status(build):
+ return build.get('status')
- def _set_project_status(self, project_name, status):
- ps = self._ps.get_instance(project_name)
- ps['status'] = status
- self._ps.update_instance(project_name, ps)
-
- def _start_build(self, project, worker):
- build_no = self._pick_build_number(project)
- build_id = '{}/{}'.format(project['project'], build_no)
-
- ick2.log.log(
- 'info', msg_text='Starting new build', build_id=build_id,
- build_no=build_no)
- parameters = project.get('parameters', {})
- create_workspace = {
- 'action': 'create_workspace',
- 'where': 'host',
- }
- actions = [create_workspace] + self._get_actions(project)
- build = {
- 'build_id': build_id,
- 'build_number': build_no,
- 'log': '/logs/{}'.format(build_id),
- 'worker': worker,
- 'project': project['project'],
- 'parameters': parameters,
- 'status': 'building',
- 'actions': actions,
- 'current_action': 0,
- }
- self._builds.add(build_id, build)
- return build_id, build_no
-
- def _get_actions(self, project):
- actions = []
- for pipeline_name in project['pipelines']:
- pipeline = self._state.get_resource('pipelines', pipeline_name)
- actions.extend(list(pipeline['actions']))
- return actions
-
- def _get_build(self, build_id):
- return self._builds.get(build_id)
+ def is_building(build):
+ return status(build) == 'building'
+
+ def is_triggered(build):
+ return status(build) == 'triggered'
+
+ builds = self._trans.get_resources('builds')
+ return (self._find_build(builds, on_worker, is_building) or
+ self._find_build(builds, is_triggered))
+
+ def _find_build(self, builds, *preds):
+ for build in builds:
+ if all(pred(build) for pred in preds):
+ return build['build_id']
+ return None
def _start_log(self, build_id):
ick2.log.log('info', msg_text='Starting new log', build_id=build_id)
- log = {
- 'build_id': build_id,
- 'log': '',
- }
- self._state.add_resource('log', str(build_id), log)
+ with self._trans.new('log', build_id) as r:
+ r.from_dict({
+ 'build_id': build_id,
+ 'log': '',
+ })
def update_work(self, update, **kwargs):
- if 'worker' not in update: # pragma: no cover
- raise ick2.BadUpdate('no worker specified')
-
- worker_state = self._workers.get_worker(update['worker'])
- doing = worker_state.get('doing', {})
- self._check_work_update(doing, update)
-
- project_name = update['project']
-
- self._append_to_build_log(update)
-
- exit_code = update.get('exit_code')
- if exit_code == 0:
- build_id = doing['build_id']
- build = self._get_build(build_id)
- actions = build['actions']
- current_action = build['current_action']
- if current_action + 1 >= len(actions):
- self._set_project_status(project_name, 'idle')
- doing = {}
- self._finish_build(update)
- else:
- index = current_action + 1
- build['current_action'] = index
- self._update_build(build)
- doing['step'] = actions[index]
-
- worker_state = {
- 'worker': update['worker'],
- 'doing': doing,
- }
- self._workers.update_worker(worker_state)
- elif exit_code is not None:
- assert isinstance(exit_code, int)
- assert exit_code != 0
- self._set_project_status(project_name, 'idle')
- self._finish_build(update)
-
- worker_state = {
- 'worker': update['worker'],
- 'doing': {},
- }
- self._workers.update_worker(worker_state)
+ try:
+ worker_id = update['worker']
+ build_id = update['build_id']
+ project_name = update['project']
+ exit_code = update.get('exit_code')
+ except KeyError as e: # pragma: no cover
+ raise ick2.BadUpdate(str(e))
+
+ with self._trans.modify('workers', worker_id) as worker:
+ with self._trans.modify('builds', build_id) as build:
+ doing = worker.get('doing', {})
+ self._check_work_update(doing, update)
+ self._append_to_build_log(update)
+
+ if exit_code is not None:
+ if exit_code == 0:
+ action = self._move_to_next_action(build)
+ if action is None:
+ doing = {}
+ self._finish_build(build, exit_code)
+ else:
+ doing['step'] = action
+
+ worker.from_dict({
+ 'worker': worker_id,
+ 'doing': doing,
+ })
+ elif exit_code is not None:
+ self._finish_build(build, exit_code)
+ worker.from_dict({
+ 'worker': worker_id,
+ 'doing': {},
+ })
def _check_work_update(self, doing, update): # pragma: no cover
must_match = ['worker', 'project', 'build_id']
for name in must_match:
if name not in update:
raise ick2.BadUpdate('{} not specified'.format(name))
- if doing.get(name) != update[name]:
+ if doing.get(name) is not None and doing.get(name) != update[name]:
raise ick2.BadUpdate(
'{} differs from current work: {} vs {}'.format(
name, doing.get(name), update[name]))
def _append_to_build_log(self, update):
build_id = update['build_id']
- log = self._state.get_resource('log', str(build_id))
- for kind in ['stdout', 'stderr']:
- text = update.get(kind, '')
- if text is not None:
- log['log'] += text
- self._state.update_resource('log', str(build_id), log)
-
- def _update_build(self, build):
- self._builds.update_build(build)
-
- def _finish_build(self, update):
- build_id = update['build_id']
- build = self._get_build(build_id)
- build['status'] = update['exit_code']
+ with self._trans.modify('log', build_id) as log:
+ for stream in ['stdout', 'stderr']:
+ text = update.get(stream, '')
+ if text is not None:
+ log['log'] += text
+
+ def _move_to_next_action(self, build):
+ actions = build['actions']
+ current_action = build['current_action']
+ if current_action + 1 >= len(actions):
+ return None
+
+ index = current_action + 1
+ build['current_action'] = index
+ return actions[index]
+
+ def _finish_build(self, build, exit_code):
+ build['status'] = exit_code
build['current_action'] = None
- self._update_build(build)
def create(self, body, **kwargs): # pragma: no cover
pass